Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ def run_node(node: SchedulingUnit) -> None:
execution_time=execution_time,
)
else:
# If batch_index > 0, then the target table must exist since the first batch would have created it
target_table_exists = (
snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
)
audit_results = self.evaluate(
snapshot=snapshot,
environment_naming_info=environment_naming_info,
Expand All @@ -548,7 +552,7 @@ def run_node(node: SchedulingUnit) -> None:
batch_index=node.batch_index,
allow_destructive_snapshots=allow_destructive_snapshots,
allow_additive_snapshots=allow_additive_snapshots,
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
target_table_exists=target_table_exists,
selected_models=selected_models,
)

Expand Down
33 changes: 33 additions & 0 deletions tests/core/integration/test_change_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,3 +1482,36 @@ def test_annotated_self_referential_model(init_and_plan_context: t.Callable):

df = context.fetchdf("SELECT one FROM memory.sushi.test_self_ref")
assert len(df) == 0


@time_machine.travel("2023-01-08 00:00:00 UTC")
def test_creating_stage_for_first_batch_only(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context("examples/sushi")

expressions = d.parse(
"""
MODEL (
name memory.sushi.test_batch_size,
kind INCREMENTAL_BY_UNIQUE_KEY (
unique_key one,
batch_size 1,
),

start '2023-01-01',
);

CREATE SCHEMA IF NOT EXISTS test_schema;
CREATE TABLE IF NOT EXISTS test_schema.creating_counter (a INT);

SELECT 1::INT AS one;

@IF(@runtime_stage = 'creating', INSERT INTO test_schema.creating_counter (a) VALUES (1));
"""
)
model = load_sql_based_model(expressions)
context.upsert_model(model)

context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True)
assert (
context.engine_adapter.fetchone("SELECT COUNT(*) FROM test_schema.creating_counter")[0] == 1
)