|
36 | 36 | SnapshotCreationFailedError, |
37 | 37 | ) |
38 | 38 | from sqlmesh.utils import CompletionStatus |
39 | | -from sqlmesh.core.state_sync import StateSync |
| 39 | +from sqlmesh.core.state_sync import StateSync, StateReader |
40 | 40 | from sqlmesh.core.state_sync.base import PromotionResult |
41 | 41 | from sqlmesh.utils.concurrency import NodeExecutionFailedError |
42 | 42 | from sqlmesh.utils.errors import PlanError |
@@ -284,23 +284,7 @@ def _push( |
284 | 284 | new_snapshots=plan.new_snapshots, plan_id=plan.plan_id |
285 | 285 | ) |
286 | 286 |
|
287 | | - promoted_snapshot_ids = ( |
288 | | - set(plan.environment.promoted_snapshot_ids) |
289 | | - if plan.environment.promoted_snapshot_ids is not None |
290 | | - else None |
291 | | - ) |
292 | | - |
293 | | - def _should_create(s: Snapshot) -> bool: |
294 | | - if not s.is_model or s.is_symbolic: |
295 | | - return False |
296 | | - # Only create tables for snapshots that we're planning to promote or that were selected for backfill |
297 | | - return ( |
298 | | - plan.is_selected_for_backfill(s.name) |
299 | | - or promoted_snapshot_ids is None |
300 | | - or s.snapshot_id in promoted_snapshot_ids |
301 | | - ) |
302 | | - |
303 | | - snapshots_to_create = [s for s in snapshots.values() if _should_create(s)] |
| 287 | + snapshots_to_create = get_snapshots_to_create(plan, snapshots) |
304 | 288 |
|
305 | 289 | completion_status = None |
306 | 290 | progress_stopped = False |
@@ -573,32 +557,7 @@ def _run_audits_for_metadata_snapshots( |
573 | 557 | plan: EvaluatablePlan, |
574 | 558 | new_snapshots: t.Dict[SnapshotId, Snapshot], |
575 | 559 | ) -> None: |
576 | | - # Filter out snapshots that are not categorized as metadata changes on models |
577 | | - metadata_snapshots = [] |
578 | | - for snapshot in new_snapshots.values(): |
579 | | - if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable: |
580 | | - continue |
581 | | - |
582 | | - metadata_snapshots.append(snapshot) |
583 | | - |
584 | | - # Bulk load all the previous snapshots |
585 | | - previous_snapshots = self.state_sync.get_snapshots( |
586 | | - [ |
587 | | - s.previous_version.snapshot_id(s.name) |
588 | | - for s in metadata_snapshots |
589 | | - if s.previous_version |
590 | | - ] |
591 | | - ).values() |
592 | | - |
593 | | - # Check if any of the snapshots have modifications to the audits field by comparing the hashes |
594 | | - audit_snapshots = {} |
595 | | - for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots): |
596 | | - new_audits_hash = snapshot.model.audit_metadata_hash() |
597 | | - previous_audit_hash = previous_snapshot.model.audit_metadata_hash() |
598 | | - |
599 | | - if snapshot.model.audits and previous_audit_hash != new_audits_hash: |
600 | | - audit_snapshots[snapshot.snapshot_id] = snapshot |
601 | | - |
| 560 | + audit_snapshots = get_audit_only_snapshots(new_snapshots, self.state_sync) |
602 | 561 | if not audit_snapshots: |
603 | 562 | return |
604 | 563 |
|
@@ -636,3 +595,52 @@ def update_intervals_for_new_snapshots( |
636 | 595 |
|
637 | 596 | if snapshots_intervals: |
638 | 597 | state_sync.add_snapshots_intervals(snapshots_intervals) |
| 598 | + |
| 599 | + |
| 600 | +def get_audit_only_snapshots( |
| 601 | + new_snapshots: t.Dict[SnapshotId, Snapshot], state_reader: StateReader |
| 602 | +) -> t.Dict[SnapshotId, Snapshot]: |
| 603 | + metadata_snapshots = [] |
| 604 | + for snapshot in new_snapshots.values(): |
| 605 | + if not snapshot.is_metadata or not snapshot.is_model or not snapshot.evaluatable: |
| 606 | + continue |
| 607 | + |
| 608 | + metadata_snapshots.append(snapshot) |
| 609 | + |
| 610 | + # Bulk load all the previous snapshots |
| 611 | + previous_snapshots = state_reader.get_snapshots( |
| 612 | + [s.previous_version.snapshot_id(s.name) for s in metadata_snapshots if s.previous_version] |
| 613 | + ).values() |
| 614 | + |
| 615 | + # Check if any of the snapshots have modifications to the audits field by comparing the hashes |
| 616 | + audit_snapshots = {} |
| 617 | + for snapshot, previous_snapshot in zip(metadata_snapshots, previous_snapshots): |
| 618 | + new_audits_hash = snapshot.model.audit_metadata_hash() |
| 619 | + previous_audit_hash = previous_snapshot.model.audit_metadata_hash() |
| 620 | + |
| 621 | + if snapshot.model.audits and previous_audit_hash != new_audits_hash: |
| 622 | + audit_snapshots[snapshot.snapshot_id] = snapshot |
| 623 | + |
| 624 | + return audit_snapshots |
| 625 | + |
| 626 | + |
| 627 | +def get_snapshots_to_create( |
| 628 | + plan: EvaluatablePlan, snapshots: t.Dict[SnapshotId, Snapshot] |
| 629 | +) -> t.List[Snapshot]: |
| 630 | + promoted_snapshot_ids = ( |
| 631 | + set(plan.environment.promoted_snapshot_ids) |
| 632 | + if plan.environment.promoted_snapshot_ids is not None |
| 633 | + else None |
| 634 | + ) |
| 635 | + |
| 636 | + def _should_create(s: Snapshot) -> bool: |
| 637 | + if not s.is_model or s.is_symbolic: |
| 638 | + return False |
| 639 | + # Only create tables for snapshots that we're planning to promote or that were selected for backfill |
| 640 | + return ( |
| 641 | + plan.is_selected_for_backfill(s.name) |
| 642 | + or promoted_snapshot_ids is None |
| 643 | + or s.snapshot_id in promoted_snapshot_ids |
| 644 | + ) |
| 645 | + |
| 646 | + return [s for s in snapshots.values() if _should_create(s)] |
0 commit comments