Skip to content

Commit 95fabfc

Browse files
authored
Fix: Make sure there is only one copy of a new snapshot with the same ID when migration snapshot rows (#1712)
1 parent 78e45eb commit 95fabfc

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,8 @@ def _migrate_rows(self) -> None:
832832
)
833833
environments = self.get_environments()
834834

835-
snapshot_mapping = {}
835+
snapshot_id_mapping: t.Dict[SnapshotId, SnapshotId] = {}
836+
new_snapshots: t.Dict[SnapshotId, Snapshot] = {}
836837

837838
if all_snapshots:
838839
self.console.start_migration_progress(len(all_snapshots))
@@ -902,10 +903,18 @@ def _migrate_rows(self) -> None:
902903
logger.debug(f"{new_snapshot.snapshot_id} exists.")
903904
continue
904905

905-
snapshot_mapping[snapshot.snapshot_id] = new_snapshot
906-
logger.debug(f"{snapshot.snapshot_id} mapped to {new_snapshot.snapshot_id}.")
906+
new_snapshot_id = new_snapshot.snapshot_id
907907

908-
if not snapshot_mapping:
908+
if (
909+
new_snapshot_id not in new_snapshots
910+
or new_snapshot.updated_ts > new_snapshots[new_snapshot_id].updated_ts
911+
):
912+
new_snapshots[new_snapshot_id] = new_snapshot
913+
914+
snapshot_id_mapping[snapshot.snapshot_id] = new_snapshot_id
915+
logger.debug(f"{snapshot.snapshot_id} mapped to {new_snapshot_id}.")
916+
917+
if not snapshot_id_mapping:
909918
logger.debug("No changes to snapshots detected.")
910919
return
911920

@@ -915,14 +924,15 @@ def map_data_versions(
915924
version_ids = ((version.snapshot_id(name), version) for version in versions)
916925

917926
return tuple(
918-
snapshot_mapping[version_id].data_version
919-
if version_id in snapshot_mapping
927+
new_snapshots[snapshot_id_mapping[version_id]].data_version
928+
if version_id in snapshot_id_mapping
920929
else version
921930
for version_id, version in version_ids
922931
)
923932

924-
for from_snapshot_id, to_snapshot in snapshot_mapping.items():
933+
for from_snapshot_id, to_snapshot_id in snapshot_id_mapping.items():
925934
from_snapshot = all_snapshots[from_snapshot_id]
935+
to_snapshot = new_snapshots[to_snapshot_id]
926936
to_snapshot.previous_versions = map_data_versions(
927937
from_snapshot.name, from_snapshot.previous_versions
928938
)
@@ -931,15 +941,14 @@ def map_data_versions(
931941
for name, versions in from_snapshot.indirect_versions.items()
932942
}
933943

934-
new_snapshots = set(snapshot_mapping.values())
935-
self._push_snapshots(new_snapshots, overwrite=True)
944+
self._push_snapshots(new_snapshots.values(), overwrite=True)
936945

937946
updated_prod_environment: t.Optional[Environment] = None
938947
updated_environments = []
939948
for environment in environments:
940949
snapshots = [
941-
snapshot_mapping[info.snapshot_id].table_info
942-
if info.snapshot_id in snapshot_mapping
950+
new_snapshots[snapshot_id_mapping[info.snapshot_id]].table_info
951+
if info.snapshot_id in snapshot_id_mapping
943952
else info
944953
for info in environment.snapshots
945954
]

0 commit comments

Comments
 (0)