Skip to content

Commit 440eb55

Browse files
authored
Fix: Only insert new seed rows during migration (#2381)
1 parent ff1b6fe commit 440eb55

File tree

1 file changed

+25
-16
lines changed

1 file changed

+25
-16
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -373,13 +373,7 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
373373
self.engine_adapter.delete_from(self.seeds_table, where=where)
374374

375375
def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
376-
return {
377-
SnapshotId(name=name, identifier=identifier)
378-
for where in self._snapshot_id_filter(snapshot_ids)
379-
for name, identifier in self._fetchall(
380-
exp.select("name", "identifier").from_(self.snapshots_table).where(where)
381-
)
382-
}
376+
return self._snapshot_ids_exist(snapshot_ids, self.snapshots_table)
383377

384378
def nodes_exist(self, names: t.Iterable[str], exclude_external: bool = False) -> t.Set[str]:
385379
names = set(names)
@@ -1166,19 +1160,23 @@ def _migrate_seed_rows(self, snapshot_mapping: t.Dict[SnapshotId, SnapshotTableI
11661160
if not seeds:
11671161
continue
11681162

1169-
new_seeds = []
1163+
new_seeds = {}
11701164
for snapshot_id, content in seeds.items():
11711165
new_snapshot_id = snapshot_mapping[snapshot_id].snapshot_id
1172-
new_seeds.append(
1173-
{
1174-
"name": new_snapshot_id.name,
1175-
"identifier": new_snapshot_id.identifier,
1176-
"content": content,
1177-
}
1178-
)
1166+
new_seeds[new_snapshot_id] = {
1167+
"name": new_snapshot_id.name,
1168+
"identifier": new_snapshot_id.identifier,
1169+
"content": content,
1170+
}
1171+
1172+
existing_snapshot_ids = self._snapshot_ids_exist(new_seeds, self.seeds_table)
1173+
seeds_to_push = [
1174+
s for s_id, s in new_seeds.items() if s_id not in existing_snapshot_ids
1175+
]
1176+
11791177
self.engine_adapter.insert_append(
11801178
self.seeds_table,
1181-
pd.DataFrame(new_seeds),
1179+
pd.DataFrame(seeds_to_push),
11821180
columns_to_types=self._seed_columns_to_types,
11831181
contains_json=True,
11841182
)
@@ -1217,6 +1215,17 @@ def _migrate_environment_rows(
12171215
except Exception:
12181216
logger.warning("Failed to unpause migrated snapshots", exc_info=True)
12191217

1218+
def _snapshot_ids_exist(
1219+
self, snapshot_ids: t.Iterable[SnapshotIdLike], table_name: exp.Table
1220+
) -> t.Set[SnapshotId]:
1221+
return {
1222+
SnapshotId(name=name, identifier=identifier)
1223+
for where in self._snapshot_id_filter(snapshot_ids)
1224+
for name, identifier in self._fetchall(
1225+
exp.select("name", "identifier").from_(table_name).where(where)
1226+
)
1227+
}
1228+
12201229
def _snapshot_id_filter(
12211230
self,
12221231
snapshot_ids: t.Iterable[SnapshotIdLike],

0 commit comments

Comments
 (0)