Skip to content

Commit cd29995

Browse files
committed
Fix: Incorrect treatment of the model's table as missing when processing batches with index > 0
1 parent 4638b5c commit cd29995

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,10 @@ def run_node(node: SchedulingUnit) -> None:
538538
execution_time=execution_time,
539539
)
540540
else:
541+
# If batch_index > 0, then the target table must exist since the first batch would have created it
542+
target_table_exists = (
543+
snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
544+
)
541545
audit_results = self.evaluate(
542546
snapshot=snapshot,
543547
environment_naming_info=environment_naming_info,
@@ -548,7 +552,7 @@ def run_node(node: SchedulingUnit) -> None:
548552
batch_index=node.batch_index,
549553
allow_destructive_snapshots=allow_destructive_snapshots,
550554
allow_additive_snapshots=allow_additive_snapshots,
551-
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
555+
target_table_exists=target_table_exists,
552556
selected_models=selected_models,
553557
)
554558

tests/core/integration/test_change_scenarios.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,3 +1482,37 @@ def test_annotated_self_referential_model(init_and_plan_context: t.Callable):
14821482

14831483
df = context.fetchdf("SELECT one FROM memory.sushi.test_self_ref")
14841484
assert len(df) == 0
1485+
1486+
1487+
@time_machine.travel("2023-01-08 00:00:00 UTC")
1488+
def test_creating_stage_for_first_batch_only(init_and_plan_context: t.Callable):
1489+
context, _ = init_and_plan_context("examples/sushi")
1490+
1491+
# Projections are fully annotated in the query but columns were not specified explicitly
1492+
expressions = d.parse(
1493+
f"""
1494+
MODEL (
1495+
name memory.sushi.test_batch_size,
1496+
kind INCREMENTAL_BY_UNIQUE_KEY (
1497+
unique_key one,
1498+
batch_size 1,
1499+
),
1500+
1501+
start '2023-01-01',
1502+
);
1503+
1504+
CREATE SCHEMA IF NOT EXISTS test_schema;
1505+
CREATE TABLE IF NOT EXISTS test_schema.creating_counter (a INT);
1506+
1507+
SELECT 1::INT AS one;
1508+
1509+
@IF(@runtime_stage = 'creating', INSERT INTO test_schema.creating_counter (a) VALUES (1));
1510+
"""
1511+
)
1512+
model = load_sql_based_model(expressions)
1513+
context.upsert_model(model)
1514+
1515+
context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True)
1516+
assert (
1517+
context.engine_adapter.fetchone("SELECT COUNT(*) FROM test_schema.creating_counter")[0] == 1
1518+
)

0 commit comments

Comments
 (0)