Skip to content

Commit 56990fe

Browse files
eakmanrqizeigerman
authored andcommitted
fix: respect disable_restatement remove intervals across env (#3838)
1 parent b207702 commit 56990fe

File tree

6 files changed

+136
-4
lines changed

6 files changed

+136
-4
lines changed

sqlmesh/core/plan/definition.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ def to_evaluatable(self) -> EvaluatablePlan:
255255
models_to_backfill=self.models_to_backfill,
256256
interval_end_per_model=self.interval_end_per_model,
257257
execution_time=self.execution_time,
258+
disabled_restatement_models={
259+
s.name
260+
for s in self.snapshots.values()
261+
if s.is_model and s.model.disable_restatement
262+
},
258263
)
259264

260265
@cached_property
@@ -285,6 +290,7 @@ class EvaluatablePlan(PydanticModel):
285290
models_to_backfill: t.Optional[t.Set[str]] = None
286291
interval_end_per_model: t.Optional[t.Dict[str, int]] = None
287292
execution_time: t.Optional[TimeLike] = None
293+
disabled_restatement_models: t.Set[str]
288294

289295
def is_selected_for_backfill(self, model_fqn: str) -> bool:
290296
return self.models_to_backfill is None or model_fqn in self.models_to_backfill

sqlmesh/core/plan/evaluator.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,9 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
395395
#
396396
# Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
397397
snapshot_intervals_to_restate.update(
398-
self._restatement_intervals_across_all_environments(plan.restatements)
398+
self._restatement_intervals_across_all_environments(
399+
plan.restatements, plan.disabled_restatement_models
400+
)
399401
)
400402

401403
self.state_sync.remove_intervals(
@@ -404,12 +406,12 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
404406
)
405407

406408
def _restatement_intervals_across_all_environments(
407-
self, prod_restatements: t.Dict[str, Interval]
409+
self, prod_restatements: t.Dict[str, Interval], disable_restatement_models: t.Set[str]
408410
) -> t.Set[t.Tuple[SnapshotTableInfo, Interval]]:
409411
"""
410412
Given a map of snapshot names + intervals to restate in prod:
411413
- Look up matching snapshots across all environments (match based on name - regardless of version)
412-
- For each match, also match downstream snapshots
414+
- For each match, also match downstream snapshots while filtering out models that have restatement disabled
413415
- Return all matches mapped to the intervals of the prod snapshot being restated
414416
415417
The goal here is to produce a list of intervals to invalidate across all environments so that a cadence
@@ -430,7 +432,11 @@ def _restatement_intervals_across_all_environments(
430432
for restatement, intervals in prod_restatements.items():
431433
if restatement not in keyed_snapshots:
432434
continue
433-
affected_snapshot_names = [restatement] + env_dag.downstream(restatement)
435+
affected_snapshot_names = [
436+
x
437+
for x in ([restatement] + env_dag.downstream(restatement))
438+
if x not in disable_restatement_models
439+
]
434440
snapshots_to_restate.update(
435441
{(keyed_snapshots[a], intervals) for a in affected_snapshot_names}
436442
)

tests/core/test_integration.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2156,6 +2156,118 @@ def _dates_in_table(table_name: str) -> t.List[str]:
21562156
], f"Table {tbl} wasnt cleared"
21572157

21582158

2159+
def test_restatement_plan_respects_disable_restatements(tmp_path: Path):
2160+
model_a = """
2161+
MODEL (
2162+
name test.a,
2163+
kind INCREMENTAL_BY_TIME_RANGE (
2164+
time_column "ts"
2165+
),
2166+
start '2024-01-01',
2167+
cron '@daily'
2168+
);
2169+
2170+
select account_id, ts from test.external_table;
2171+
"""
2172+
2173+
model_b = """
2174+
MODEL (
2175+
name test.b,
2176+
kind INCREMENTAL_BY_TIME_RANGE (
2177+
time_column "ts",
2178+
disable_restatement true,
2179+
),
2180+
start '2024-01-01',
2181+
cron '@daily'
2182+
);
2183+
2184+
select account_id, ts from test.a;
2185+
"""
2186+
2187+
models_dir = tmp_path / "models"
2188+
models_dir.mkdir()
2189+
2190+
for path, defn in {"a.sql": model_a, "b.sql": model_b}.items():
2191+
with open(models_dir / path, "w") as f:
2192+
f.write(defn)
2193+
2194+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2195+
ctx = Context(paths=[tmp_path], config=config)
2196+
2197+
engine_adapter = ctx.engine_adapter
2198+
engine_adapter.create_schema("test")
2199+
2200+
# source data
2201+
df = pd.DataFrame(
2202+
{
2203+
"account_id": [1001, 1002, 1003, 1004],
2204+
"ts": [
2205+
"2024-01-01 00:30:00",
2206+
"2024-01-01 01:30:00",
2207+
"2024-01-01 02:30:00",
2208+
"2024-01-02 00:30:00",
2209+
],
2210+
}
2211+
)
2212+
columns_to_types = {
2213+
"account_id": exp.DataType.build("int"),
2214+
"ts": exp.DataType.build("timestamp"),
2215+
}
2216+
external_table = exp.table_(table="external_table", db="test", quoted=True)
2217+
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
2218+
engine_adapter.insert_append(
2219+
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
2220+
)
2221+
2222+
# plan + apply
2223+
ctx.plan(auto_apply=True, no_prompts=True)
2224+
2225+
def _dates_in_table(table_name: str) -> t.List[str]:
2226+
return [
2227+
str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts")
2228+
]
2229+
2230+
def get_snapshot_intervals(snapshot_id):
2231+
return list(ctx.state_sync.get_snapshots([snapshot_id]).values())[0].intervals
2232+
2233+
# verify initial state
2234+
for tbl in ["test.a", "test.b"]:
2235+
assert _dates_in_table(tbl) == [
2236+
"2024-01-01 00:30:00",
2237+
"2024-01-01 01:30:00",
2238+
"2024-01-01 02:30:00",
2239+
"2024-01-02 00:30:00",
2240+
]
2241+
2242+
# restate A and expect b to be ignored
2243+
starting_b_intervals = get_snapshot_intervals(ctx.snapshots['"memory"."test"."b"'].snapshot_id)
2244+
engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'")
2245+
ctx.plan(
2246+
restate_models=["test.a"],
2247+
start="2024-01-01",
2248+
end="2024-01-02",
2249+
auto_apply=True,
2250+
no_prompts=True,
2251+
)
2252+
2253+
# verify A was changed and not b
2254+
assert _dates_in_table("test.a") == [
2255+
"2024-01-01 00:30:00",
2256+
"2024-01-01 02:30:00",
2257+
"2024-01-02 00:30:00",
2258+
]
2259+
assert _dates_in_table("test.b") == [
2260+
"2024-01-01 00:30:00",
2261+
"2024-01-01 01:30:00",
2262+
"2024-01-01 02:30:00",
2263+
"2024-01-02 00:30:00",
2264+
]
2265+
2266+
# Verify B intervals were not touched
2267+
b_intervals = get_snapshot_intervals(ctx.snapshots['"memory"."test"."b"'].snapshot_id)
2268+
assert starting_b_intervals == b_intervals
2269+
2270+
21592271
def test_restatement_plan_clears_correct_intervals_across_environments(tmp_path: Path):
21602272
model1 = """
21612273
MODEL (

tests/schedulers/airflow/test_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
7777
removed_snapshots=[],
7878
requires_backfill=True,
7979
models_to_backfill={'"test_model"'},
80+
disabled_restatement_models=set(),
8081
)
8182

8283
client = AirflowClient(airflow_url=common.AIRFLOW_LOCAL_URL, session=requests.Session())
@@ -194,6 +195,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
194195
'"test_model"': [to_timestamp("2024-01-01"), to_timestamp("2024-01-02")]
195196
},
196197
"requires_backfill": True,
198+
"disabled_restatement_models": [],
197199
},
198200
"notification_targets": [],
199201
"backfill_concurrent_tasks": 1,

tests/schedulers/airflow/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def _create_evaluatable_plan(
115115
indirectly_modified_snapshots={},
116116
removed_snapshots=[],
117117
requires_backfill=True,
118+
disabled_restatement_models=set(),
118119
)
119120

120121

tests/schedulers/airflow/test_plan.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def test_create_plan_dag_spec(
125125
interval_end_per_model=None,
126126
allow_destructive_models=set(),
127127
requires_backfill=True,
128+
disabled_restatement_models=set(),
128129
)
129130

130131
plan_request = common.PlanApplicationRequest(
@@ -268,6 +269,7 @@ def test_restatement(
268269
interval_end_per_model=None,
269270
allow_destructive_models=set(),
270271
requires_backfill=True,
272+
disabled_restatement_models=set(),
271273
)
272274

273275
plan_request = common.PlanApplicationRequest(
@@ -389,6 +391,7 @@ def test_select_models_for_backfill(mocker: MockerFixture, random_name, make_sna
389391
interval_end_per_model=None,
390392
allow_destructive_models=set(),
391393
requires_backfill=True,
394+
disabled_restatement_models=set(),
392395
)
393396

394397
plan_request = common.PlanApplicationRequest(
@@ -474,6 +477,7 @@ def test_create_plan_dag_spec_duplicated_snapshot(
474477
interval_end_per_model=None,
475478
allow_destructive_models=set(),
476479
requires_backfill=True,
480+
disabled_restatement_models=set(),
477481
)
478482

479483
plan_request = common.PlanApplicationRequest(
@@ -536,6 +540,7 @@ def test_create_plan_dag_spec_unbounded_end(
536540
interval_end_per_model=None,
537541
allow_destructive_models=set(),
538542
requires_backfill=True,
543+
disabled_restatement_models=set(),
539544
)
540545

541546
plan_request = common.PlanApplicationRequest(

0 commit comments

Comments
 (0)