Skip to content

Commit fda88aa

Browse files
authored
fix: changing catalog env override existing catalog matches (#2025)
* fix: changing catalog env override existing catalog matches * rewrite environment view change detection
1 parent b906682 commit fda88aa

File tree

2 files changed

+133
-69
lines changed

2 files changed

+133
-69
lines changed

sqlmesh/core/state_sync/common.py

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
SnapshotIdLike,
1616
SnapshotInfoLike,
1717
SnapshotNameVersionLike,
18+
SnapshotTableInfo,
1819
start_date,
1920
)
2021
from sqlmesh.core.state_sync.base import PromotionResult, StateSync
@@ -128,77 +129,62 @@ def promote(
128129

129130
existing_environment = self._get_environment(environment.name, lock_for_update=True)
130131

131-
environment_suffix_target_changed = False
132-
environment_catalog_changed = False
133-
removed_environment_naming_info = environment.naming_info
132+
existing_table_infos = (
133+
{table_info.name: table_info for table_info in existing_environment.promoted_snapshots}
134+
if existing_environment
135+
else {}
136+
)
137+
table_infos = {table_info.name: table_info for table_info in environment.promoted_snapshots}
138+
views_that_changed_location: t.Set[SnapshotTableInfo] = set()
134139
if existing_environment:
140+
views_that_changed_location = {
141+
existing_table_info
142+
for name, existing_table_info in existing_table_infos.items()
143+
if name in table_infos
144+
and existing_table_info.qualified_view_name.for_environment(
145+
existing_environment.naming_info
146+
)
147+
!= table_infos[name].qualified_view_name.for_environment(environment.naming_info)
148+
}
135149
if environment.previous_plan_id != existing_environment.plan_id:
136150
raise SQLMeshError(
137151
f"Plan '{environment.plan_id}' is no longer valid for the target environment '{environment.name}'. "
138152
f"Expected previous plan ID: '{environment.previous_plan_id}', actual previous plan ID: '{existing_environment.plan_id}'. "
139153
"Please recreate the plan and try again"
140154
)
141-
142-
environment_suffix_target_changed = (
143-
environment.suffix_target != existing_environment.suffix_target
144-
)
145-
if environment_suffix_target_changed:
146-
removed_environment_naming_info.suffix_target = existing_environment.suffix_target
147-
148-
environment_catalog_changed = (
149-
environment.catalog_name_override != existing_environment.catalog_name_override
150-
)
151-
if environment_catalog_changed:
152-
removed_environment_naming_info.catalog_name_override = (
153-
existing_environment.catalog_name_override
154-
)
155-
156155
if no_gaps_snapshot_names != set():
157156
snapshots = self._get_snapshots(environment.snapshots).values()
158157
self._ensure_no_gaps(
159158
snapshots,
160159
existing_environment,
161160
no_gaps_snapshot_names,
162161
)
163-
164-
existing_table_infos = {
165-
table_info.name: table_info
166-
for table_info in existing_environment.promoted_snapshots
167-
}
168-
169162
demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots)
170163
for demoted_snapshot in self._get_snapshots(demoted_snapshots).values():
171164
# Update the updated_at attribute.
172165
self._update_snapshot(demoted_snapshot)
173-
else:
174-
existing_table_infos = {}
175166

176167
missing_models = set(existing_table_infos) - {
177168
snapshot.name for snapshot in environment.promoted_snapshots
178169
}
179170

180-
table_infos = set(environment.promoted_snapshots)
181-
if (
182-
existing_environment
183-
and existing_environment.finalized_ts
184-
and not environment_suffix_target_changed
185-
and not environment_catalog_changed
186-
):
171+
added_table_infos = set(table_infos.values())
172+
if existing_environment and existing_environment.finalized_ts:
187173
# Only promote new snapshots.
188-
table_infos -= set(existing_environment.promoted_snapshots)
174+
added_table_infos -= set(existing_environment.promoted_snapshots)
189175

190176
self._update_environment(environment)
191177

192-
removed = (
193-
list(existing_table_infos.values())
194-
if environment_suffix_target_changed or environment_catalog_changed
195-
else [existing_table_infos[name] for name in missing_models]
178+
removed = {existing_table_infos[name] for name in missing_models}.union(
179+
views_that_changed_location
196180
)
197181

198182
return PromotionResult(
199-
added=sorted(table_infos),
200-
removed=removed,
201-
removed_environment_naming_info=removed_environment_naming_info if removed else None,
183+
added=sorted(added_table_infos),
184+
removed=list(removed),
185+
removed_environment_naming_info=existing_environment.naming_info
186+
if removed and existing_environment
187+
else None,
202188
)
203189

204190
@transactional()

tests/core/test_state_sync.py

Lines changed: 106 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,11 @@ def test_promote_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t.
473473
[snapshot_a, snapshot_b_old, snapshot_c],
474474
"prod",
475475
)
476-
assert set(promotion_result.added) == set(
477-
[
478-
snapshot_a.table_info,
479-
snapshot_b_old.table_info,
480-
snapshot_c.table_info,
481-
]
482-
)
476+
assert set(promotion_result.added) == {
477+
snapshot_a.table_info,
478+
snapshot_b_old.table_info,
479+
snapshot_c.table_info,
480+
}
483481
assert not promotion_result.removed
484482
assert not promotion_result.removed_environment_naming_info
485483

@@ -550,7 +548,7 @@ def test_promote_snapshots_suffix_change(
550548
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
551549
)
552550

553-
assert set(promotion_result.added) == set([snapshot_a.table_info, snapshot_b.table_info])
551+
assert set(promotion_result.added) == {snapshot_a.table_info, snapshot_b.table_info}
554552
assert not promotion_result.removed
555553
assert not promotion_result.removed_environment_naming_info
556554

@@ -572,76 +570,156 @@ def test_promote_snapshots_suffix_change(
572570
)
573571

574572
# We still only add the snapshots that are included in the promotion
575-
assert set(promotion_result.added) == set([snapshot_b.table_info, snapshot_c.table_info])
576-
# We also remove b because of the suffix target change. The new one will be created in the new suffix target
577-
assert set(promotion_result.removed) == set([snapshot_a.table_info, snapshot_b.table_info])
573+
assert set(promotion_result.added) == {snapshot_b.table_info, snapshot_c.table_info}
574+
# B does not get removed because the suffix target change doesn't affect it due to running in prod.
575+
assert set(promotion_result.removed) == {snapshot_a.table_info}
578576
# Make sure the removed suffix target is correctly seen as table
579-
assert promotion_result.removed_environment_naming_info
577+
assert promotion_result.removed_environment_naming_info is not None
580578
assert promotion_result.removed_environment_naming_info.suffix_target.is_table
581579

580+
promotion_result = promote_snapshots(
581+
state_sync,
582+
[snapshot_b, snapshot_c],
583+
"dev",
584+
environment_suffix_target=EnvironmentSuffixTarget.SCHEMA,
585+
)
586+
587+
# We still only add the snapshots that are included in the promotion
588+
assert set(promotion_result.added) == {snapshot_b.table_info, snapshot_c.table_info}
589+
assert len(promotion_result.removed) == 0
590+
assert promotion_result.removed_environment_naming_info is None
591+
592+
promotion_result = promote_snapshots(
593+
state_sync,
594+
[snapshot_b, snapshot_c],
595+
"dev",
596+
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
597+
)
598+
599+
# All snapshots are promoted due to suffix target change
600+
assert set(promotion_result.added) == {
601+
snapshot_b.table_info,
602+
snapshot_c.table_info,
603+
}
604+
# All snapshots are removed due to suffix target change
605+
assert set(promotion_result.removed) == {
606+
snapshot_b.table_info,
607+
snapshot_c.table_info,
608+
}
609+
assert promotion_result.removed_environment_naming_info is not None
610+
assert promotion_result.removed_environment_naming_info.suffix_target.is_schema
611+
582612

583613
def test_promote_snapshots_catalog_name_override_change(
584614
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
585615
):
586616
snapshot_a = make_snapshot(
587617
SqlModel(
588-
name="a",
618+
name="catalog1.schema.a",
589619
query=parse_one("select 1, ds"),
590620
),
591621
)
592622
snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING)
593623

594624
snapshot_b = make_snapshot(
595625
SqlModel(
596-
name="b",
626+
name="catalog1.schema.b",
597627
kind=FullKind(),
598628
query=parse_one("select * from a"),
599629
),
600630
nodes={"a": snapshot_a.model},
601631
)
602632
snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING)
603633

604-
state_sync.push_snapshots([snapshot_a, snapshot_b])
634+
snapshot_c = make_snapshot(
635+
SqlModel(
636+
name="catalog2.schema.c",
637+
kind=FullKind(),
638+
query=parse_one("select * from a"),
639+
),
640+
nodes={"a": snapshot_a.model},
641+
)
642+
snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING)
643+
644+
state_sync.push_snapshots([snapshot_a, snapshot_b, snapshot_c])
605645

606646
promotion_result = promote_snapshots(
607647
state_sync,
608-
[snapshot_a, snapshot_b],
648+
[snapshot_a, snapshot_b, snapshot_c],
609649
"prod",
610-
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
611650
environment_catalog_mapping={},
612651
)
613652

614-
assert set(promotion_result.added) == set([snapshot_a.table_info, snapshot_b.table_info])
653+
assert set(promotion_result.added) == {
654+
snapshot_a.table_info,
655+
snapshot_b.table_info,
656+
snapshot_c.table_info,
657+
}
615658
assert not promotion_result.removed
616659
assert not promotion_result.removed_environment_naming_info
617660

618-
snapshot_c = make_snapshot(
661+
snapshot_d = make_snapshot(
619662
SqlModel(
620-
name="c",
663+
name="catalog1.schema.d",
621664
query=parse_one("select 3, ds"),
622665
),
623666
)
624-
snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING)
667+
snapshot_d.categorize_as(SnapshotChangeCategory.BREAKING)
625668

626-
state_sync.push_snapshots([snapshot_c])
669+
state_sync.push_snapshots([snapshot_d])
627670

628671
promotion_result = promote_snapshots(
629672
state_sync,
630-
[snapshot_b, snapshot_c],
673+
[snapshot_b, snapshot_c, snapshot_d],
631674
"prod",
632675
environment_catalog_mapping={
633-
re.compile("^prod$"): "prod_catalog",
676+
re.compile("^prod$"): "catalog1",
634677
},
635678
)
636679

637-
# We still only add the snapshots that are included in the promotion
638-
assert set(promotion_result.added) == set([snapshot_b.table_info, snapshot_c.table_info])
639-
# We also remove b because of the catalog change. The new one will be created in the new catalog
640-
assert set(promotion_result.removed) == set([snapshot_a.table_info, snapshot_b.table_info])
680+
# We still only add the snapshots that are included in the promotion which means removing A
681+
assert set(promotion_result.added) == {
682+
snapshot_b.table_info,
683+
snapshot_c.table_info,
684+
snapshot_d.table_info,
685+
}
686+
# C is removed because of the catalog change. The new one will be created in the new catalog.
687+
# B is not removed because it's catalog did not change and therefore removing would actually result
688+
# in dropping what we just added.
689+
# A is removed because it was explicitly removed from the promotion.
690+
assert set(promotion_result.removed) == {snapshot_a.table_info, snapshot_c.table_info}
641691
# Make sure the removed suffix target correctly has the old catalog name set
642692
assert promotion_result.removed_environment_naming_info
643693
assert promotion_result.removed_environment_naming_info.catalog_name_override is None
644694

695+
promotion_result = promote_snapshots(
696+
state_sync,
697+
[snapshot_b, snapshot_c, snapshot_d],
698+
"prod",
699+
environment_catalog_mapping={
700+
re.compile("^prod$"): "catalog2",
701+
},
702+
)
703+
704+
# All are added since their catalog was changed
705+
assert set(promotion_result.added) == {
706+
snapshot_b.table_info,
707+
snapshot_c.table_info,
708+
snapshot_d.table_info,
709+
}
710+
# All are removed since there were moved from their old catalog location
711+
# Note that C has a catalog set in the model definition of `catalog2` which is what we moved to so you might think
712+
# it shouldn't be removed, but its actual catalog was `catalog1` because of the previous override so therefore
713+
# it should be removed from `catalog1`.
714+
assert set(promotion_result.removed) == {
715+
snapshot_b.table_info,
716+
snapshot_c.table_info,
717+
snapshot_d.table_info,
718+
}
719+
# Make sure the removed suffix target correctly has the old catalog name set
720+
assert promotion_result.removed_environment_naming_info
721+
assert promotion_result.removed_environment_naming_info.catalog_name_override == "catalog1"
722+
645723

646724
def test_promote_snapshots_parent_plan_id_mismatch(
647725
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable

0 commit comments

Comments
 (0)