Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,7 @@ def create(
snapshot.is_valid_start(start, snapshot_start) if start is not None else True
)

children_deployable = is_valid_start and not has_auto_restatement
if (
snapshot.is_forward_only
or snapshot.is_indirect_non_breaking
Expand All @@ -1660,15 +1661,9 @@ def create(
):
# This snapshot represents what's currently deployed in prod.
representative_shared_version_ids.add(node)

# A child can still be deployable even if its parent is not
children_deployable = (
is_valid_start
and not (
snapshot.is_paused and (snapshot.is_forward_only or is_forward_only_model)
)
and not has_auto_restatement
)
else:
# If the parent is not representative then its children can't be deployable.
children_deployable = False
else:
children_deployable = False
if not snapshots[node].is_paused:
Expand Down
40 changes: 40 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,46 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_changes_downstream_of_indirect_non_breaking_snapshot_without_intervals(
init_and_plan_context: t.Callable,
):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

# Make a breaking change first but don't backfill it
model = context.get_model("sushi.orders")
model = model.copy(update={"stamp": "force new version"})
context.upsert_model(model)
plan_builder = context.plan_builder(
"dev", skip_backfill=True, skip_tests=True, no_auto_categorization=True
)
plan_builder.set_choice(context.get_snapshot(model), SnapshotChangeCategory.BREAKING)
context.apply(plan_builder.build())

# Now make a non-breaking change to the same snapshot.
model = model.copy(update={"stamp": "force another new version"})
context.upsert_model(model)
plan_builder = context.plan_builder(
"dev", skip_backfill=True, skip_tests=True, no_auto_categorization=True
)
plan_builder.set_choice(context.get_snapshot(model), SnapshotChangeCategory.NON_BREAKING)
context.apply(plan_builder.build())

# Now make a change to a model downstream of the above model.
downstream_model = context.get_model("sushi.top_waiters")
downstream_model = downstream_model.copy(update={"stamp": "yet another new version"})
context.upsert_model(downstream_model)
plan = context.plan_builder("dev", skip_tests=True).build()

# If the parent is not representative then the child cannot be deployable
deployability_index = plan.deployability_index
assert not deployability_index.is_representative(
context.get_snapshot("sushi.waiter_revenue_by_day")
)
assert not deployability_index.is_deployable(context.get_snapshot("sushi.top_waiters"))


@time_machine.travel("2023-01-08 15:00:00 UTC", tick=True)
def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down