Skip to content

Commit 34b9a1a

Browse files
authored
Fix: Restatement of models that are present in prod but are missing in dev (#3589)
1 parent 00143b5 commit 34b9a1a

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ def _restatement_intervals_across_all_environments(
405405
env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots})
406406

407407
for restatement, intervals in prod_restatements.items():
408+
if restatement not in keyed_snapshots:
409+
continue
408410
affected_snapshot_names = [restatement] + env_dag.downstream(restatement)
409411
snapshots_to_restate.update(
410412
{(keyed_snapshots[a], intervals) for a in affected_snapshot_names}

tests/core/test_integration.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,6 +2457,99 @@ def _dates_in_table(table_name: str) -> t.List[str]:
24572457
]
24582458

24592459

2460+
def test_prod_restatement_plan_missing_model_in_dev(
2461+
tmp_path: Path,
2462+
):
2463+
"""
2464+
Scenario:
2465+
I have a model B in prod but only model A in dev
2466+
I restate B in prod
2467+
2468+
Outcome:
2469+
The A model should be ignore and the plan shouldn't fail
2470+
"""
2471+
2472+
model_a = """
2473+
MODEL (
2474+
name test.a,
2475+
kind INCREMENTAL_BY_TIME_RANGE (
2476+
time_column "ts"
2477+
),
2478+
start '2024-01-01 00:00:00',
2479+
cron '@hourly'
2480+
);
2481+
2482+
select account_id, ts from test.external_table;
2483+
"""
2484+
2485+
model_b = """
2486+
MODEL (
2487+
name test.b,
2488+
kind INCREMENTAL_BY_TIME_RANGE (
2489+
time_column ts
2490+
),
2491+
cron '@daily'
2492+
);
2493+
2494+
select account_id, ts from test.external_table where ts between @start_ts and @end_ts;
2495+
"""
2496+
2497+
models_dir = tmp_path / "models"
2498+
models_dir.mkdir()
2499+
2500+
with open(models_dir / "a.sql", "w") as f:
2501+
f.write(model_a)
2502+
2503+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2504+
ctx = Context(paths=[tmp_path], config=config)
2505+
2506+
engine_adapter = ctx.engine_adapter
2507+
engine_adapter.create_schema("test")
2508+
2509+
# source data
2510+
df = pd.DataFrame(
2511+
{
2512+
"account_id": [1001, 1002, 1003, 1004],
2513+
"ts": [
2514+
"2024-01-01 00:30:00",
2515+
"2024-01-01 01:30:00",
2516+
"2024-01-01 02:30:00",
2517+
"2024-01-02 00:30:00",
2518+
],
2519+
}
2520+
)
2521+
columns_to_types = {
2522+
"account_id": exp.DataType.build("int"),
2523+
"ts": exp.DataType.build("timestamp"),
2524+
}
2525+
external_table = exp.table_(table="external_table", db="test", quoted=True)
2526+
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
2527+
engine_adapter.insert_append(
2528+
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
2529+
)
2530+
2531+
# plan + apply A[hourly] in dev
2532+
ctx.plan("dev", auto_apply=True, no_prompts=True)
2533+
2534+
# add B[daily] in prod and remove A
2535+
with open(models_dir / "b.sql", "w") as f:
2536+
f.write(model_b)
2537+
Path(models_dir / "a.sql").unlink()
2538+
2539+
# plan + apply dev
2540+
ctx.load()
2541+
ctx.plan(auto_apply=True, no_prompts=True)
2542+
2543+
# restate B in prod
2544+
ctx.plan(
2545+
restate_models=["test.b"],
2546+
start="2024-01-01",
2547+
end="2024-01-02",
2548+
auto_apply=True,
2549+
no_prompts=True,
2550+
)
2551+
2552+
24602553
@time_machine.travel("2023-01-08 15:00:00 UTC")
24612554
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
24622555
context, plan = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)