Skip to content

Commit bc1d6bb

Browse files
authored
Fix: No gaps check for non-representative snapshots (#1704)
1 parent 0fb1b91 commit bc1d6bb

File tree

9 files changed

+44
-34
lines changed

9 files changed

+44
-34
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def evaluate(self, plan: Plan) -> None:
101101
self._push(plan, deployability_index)
102102
self._restate(plan)
103103
self._backfill(plan, before_promote_snapshots, deployability_index)
104-
promotion_result = self._promote(plan, deployability_index)
104+
promotion_result = self._promote(plan, before_promote_snapshots)
105105
self._backfill(plan, after_promote_snapshots, deployability_index)
106106
self._update_views(plan, promotion_result, deployability_index)
107107

@@ -176,16 +176,18 @@ def on_complete(snapshot: SnapshotInfoLike) -> None:
176176
self.state_sync.push_snapshots(plan.new_snapshots)
177177

178178
def _promote(
179-
self, plan: Plan, deployability_index: t.Optional[DeployabilityIndex] = None
179+
self, plan: Plan, no_gaps_snapshot_names: t.Optional[t.Set[str]] = None
180180
) -> PromotionResult:
181181
"""Promote a plan.
182182
183183
Args:
184184
plan: The plan to promote.
185-
deployability_index: Indicates which snapshots are deployable in the context of this promotion.
185+
no_gaps_snapshot_names: The names of snapshots to check for gaps if the no gaps check is enabled in the plan.
186+
If not provided, all snapshots are checked.
186187
"""
187188
promotion_result = self.state_sync.promote(
188-
plan.environment, deployability_index=deployability_index, no_gaps=plan.no_gaps
189+
plan.environment,
190+
no_gaps_snapshot_names=no_gaps_snapshot_names if plan.no_gaps else set(),
189191
)
190192

191193
if not plan.is_dev:

sqlmesh/core/state_sync/base.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from sqlmesh import migrations
1212
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo
1313
from sqlmesh.core.snapshot import (
14-
DeployabilityIndex,
1514
Snapshot,
1615
SnapshotId,
1716
SnapshotIdLike,
@@ -326,17 +325,16 @@ def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.Lis
326325
def promote(
327326
self,
328327
environment: Environment,
329-
deployability_index: t.Optional[DeployabilityIndex] = None,
330-
no_gaps: bool = False,
328+
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
331329
) -> PromotionResult:
332330
"""Update the environment to reflect the current state.
333331
334332
This method verifies that snapshots have been pushed.
335333
336334
Args:
337335
environment: The environment to promote.
338-
deployability_index: Determines snapshots that are deployable in the context of this promotion.
339-
no_gaps: Whether to ensure that new snapshots for models that are already a
336+
no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
337+
all snapshots will be checked. The data gap check ensures that models that are already a
340338
part of the target environment have no data gaps when compared against previous
341339
snapshots for same models.
342340

sqlmesh/core/state_sync/common.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from sqlmesh.core.dialect import schema_
1111
from sqlmesh.core.environment import Environment
1212
from sqlmesh.core.snapshot import (
13-
DeployabilityIndex,
1413
Snapshot,
1514
SnapshotId,
1615
SnapshotIdLike,
@@ -100,17 +99,16 @@ def get_environment(self, environment: str) -> t.Optional[Environment]:
10099
def promote(
101100
self,
102101
environment: Environment,
103-
deployability_index: t.Optional[DeployabilityIndex] = None,
104-
no_gaps: bool = False,
102+
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
105103
) -> PromotionResult:
106104
"""Update the environment to reflect the current state.
107105
108106
This method verifies that snapshots have been pushed.
109107
110108
Args:
111109
environment: The environment to promote.
112-
deployability_index: Determines snapshots that are deployable in the context of this promotion.
113-
no_gaps: Whether to ensure that new snapshots for models that are already a
110+
no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
111+
all snapshots will be checked. The data gap check ensures that models that are already a
114112
part of the target environment have no data gaps when compared against previous
115113
snapshots for same models.
116114
@@ -144,12 +142,12 @@ def promote(
144142
"Please recreate the plan and try again"
145143
)
146144

147-
if no_gaps:
145+
if no_gaps_snapshot_names != set():
148146
snapshots = self._get_snapshots(environment.snapshots).values()
149147
self._ensure_no_gaps(
150148
snapshots,
151149
existing_environment,
152-
deployability_index or DeployabilityIndex.all_deployable(),
150+
no_gaps_snapshot_names,
153151
)
154152

155153
existing_table_infos = {
@@ -295,7 +293,7 @@ def _ensure_no_gaps(
295293
self,
296294
target_snapshots: t.Iterable[Snapshot],
297295
target_environment: Environment,
298-
deployability_index: DeployabilityIndex,
296+
snapshot_names: t.Optional[t.Set[str]],
299297
) -> None:
300298
target_snapshots_by_name = {s.name: s for s in target_snapshots}
301299

@@ -314,7 +312,7 @@ def _ensure_no_gaps(
314312
for prev_snapshot in prev_snapshots:
315313
target_snapshot = target_snapshots_by_name[prev_snapshot.name]
316314
if (
317-
deployability_index.is_representative(target_snapshot)
315+
(snapshot_names is None or prev_snapshot.name in snapshot_names)
318316
and target_snapshot.is_incremental
319317
and prev_snapshot.is_incremental
320318
and prev_snapshot.intervals

sqlmesh/schedulers/airflow/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class PlanDagSpec(PydanticModel):
8787
environment_expiration_ts: t.Optional[int] = None
8888
dag_start_ts: t.Optional[int] = None
8989
deployability_index: DeployabilityIndex = DeployabilityIndex.all_deployable()
90+
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None
9091

9192

9293
class EnvironmentsResponse(PydanticModel):

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,9 @@ def _create_promotion_demotion_tasks(
297297
python_callable=promotion_update_state_task,
298298
op_kwargs={
299299
"environment": environment,
300-
"deployability_index": request.deployability_index,
301-
"no_gaps": request.no_gaps,
300+
"no_gaps_snapshot_names": request.no_gaps_snapshot_names
301+
if request.no_gaps
302+
else set(),
302303
},
303304
)
304305

@@ -571,11 +572,10 @@ def creation_update_state_task(new_snapshots: t.Iterable[Snapshot]) -> None:
571572

572573
def promotion_update_state_task(
573574
environment: Environment,
574-
deployability_index: DeployabilityIndex,
575-
no_gaps: bool,
575+
no_gaps_snapshot_names: t.Optional[t.Set[str]],
576576
) -> None:
577577
with util.scoped_state_sync() as state_sync:
578-
state_sync.promote(environment, deployability_index=deployability_index, no_gaps=no_gaps)
578+
state_sync.promote(environment, no_gaps_snapshot_names=no_gaps_snapshot_names)
579579

580580

581581
def promotion_unpause_snapshots_task(

sqlmesh/schedulers/airflow/plan.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ def create_plan_dag_spec(
146146
for s, intervals in backfill_batches.items()
147147
]
148148

149+
no_gaps_snapshot_names = (
150+
{s.name for s in all_snapshots.values() if initial_deployability_index.is_representative(s)}
151+
if request.no_gaps and not request.is_dev
152+
else None
153+
if request.no_gaps
154+
else set()
155+
)
156+
149157
return common.PlanDagSpec(
150158
request_id=request.request_id,
151159
environment_naming_info=request.environment.naming_info,
@@ -168,6 +176,7 @@ def create_plan_dag_spec(
168176
environment_expiration_ts=request.environment.expiration_ts,
169177
dag_start_ts=now_timestamp(),
170178
deployability_index=deployability_index,
179+
no_gaps_snapshot_names=no_gaps_snapshot_names,
171180
)
172181

173182

sqlmesh/schedulers/airflow/state_sync.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from sqlmesh.core.console import Console
77
from sqlmesh.core.environment import Environment
88
from sqlmesh.core.snapshot import (
9-
DeployabilityIndex,
109
Snapshot,
1110
SnapshotId,
1211
SnapshotIdLike,
@@ -231,17 +230,16 @@ def refresh_snapshot_intervals(self, snapshots: t.Collection[Snapshot]) -> t.Lis
231230
def promote(
232231
self,
233232
environment: Environment,
234-
deployability_index: t.Optional[DeployabilityIndex] = None,
235-
no_gaps: bool = False,
233+
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
236234
) -> PromotionResult:
237235
"""Update the environment to reflect the current state.
238236
239237
This method verifies that snapshots have been pushed.
240238
241239
Args:
242240
environment: The environment to promote.
243-
deployability_index: Determines snapshots that are deployable in the context of this promotion.
244-
no_gaps: Whether to ensure that new snapshots for models that are already a
241+
no_gaps_snapshot_names: A set of snapshot names to check for data gaps. If None,
242+
all snapshots will be checked. The data gap check ensures that models that are already a
245243
part of the target environment have no data gaps when compared against previous
246244
snapshots for same models.
247245

tests/core/test_state_sync.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
SqlModel,
2323
)
2424
from sqlmesh.core.snapshot import (
25-
DeployabilityIndex,
2625
Snapshot,
2726
SnapshotChangeCategory,
2827
SnapshotId,
@@ -77,8 +76,8 @@ def promote_snapshots(
7776
state_sync: EngineAdapterStateSync,
7877
snapshots: t.List[Snapshot],
7978
environment: str,
80-
deployability_index: t.Optional[DeployabilityIndex] = None,
8179
no_gaps: bool = False,
80+
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None,
8281
environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.SCHEMA,
8382
) -> PromotionResult:
8483
env = Environment(
@@ -90,7 +89,9 @@ def promote_snapshots(
9089
plan_id="test_plan_id",
9190
previous_plan_id="test_plan_id",
9291
)
93-
return state_sync.promote(env, deployability_index=deployability_index, no_gaps=no_gaps)
92+
return state_sync.promote(
93+
env, no_gaps_snapshot_names=no_gaps_snapshot_names if no_gaps else set()
94+
)
9495

9596

9697
def delete_versions(state_sync: EngineAdapterStateSync) -> None:
@@ -671,8 +672,8 @@ def test_promote_snapshots_no_gaps(state_sync: EngineAdapterStateSync, make_snap
671672
state_sync,
672673
[new_snapshot_missing_interval],
673674
"prod",
674-
deployability_index=DeployabilityIndex.none_deployable(),
675675
no_gaps=True,
676+
no_gaps_snapshot_names=set(),
676677
)
677678

678679

tests/schedulers/airflow/test_plan.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ def test_create_plan_dag_spec(
141141
state_sync_mock.get_snapshot_intervals.return_value = []
142142
state_sync_mock.refresh_snapshot_intervals.return_value = []
143143

144+
expected_no_gaps_snapshot_names = {the_snapshot.name} if not paused_forward_only else set()
145+
144146
with mock.patch(
145147
"sqlmesh.schedulers.airflow.plan.now_timestamp",
146148
side_effect=lambda: to_timestamp("2023-01-01"),
@@ -174,7 +176,7 @@ def test_create_plan_dag_spec(
174176
is_dev=False,
175177
forward_only=True,
176178
dag_start_ts=to_timestamp("2023-01-01"),
177-
deployability_index=DeployabilityIndex.all_deployable(),
179+
no_gaps_snapshot_names=expected_no_gaps_snapshot_names,
178180
)
179181

180182
state_sync_mock.get_snapshots.assert_called_once()
@@ -286,7 +288,7 @@ def test_restatement(
286288
is_dev=False,
287289
forward_only=True,
288290
dag_start_ts=to_timestamp(now_value),
289-
deployability_index=DeployabilityIndex.all_deployable(),
291+
no_gaps_snapshot_names={the_snapshot.name},
290292
)
291293

292294
state_sync_mock.get_snapshots.assert_called_once()
@@ -392,6 +394,7 @@ def test_select_models_for_backfill(mocker: MockerFixture, random_name, make_sna
392394
forward_only=True,
393395
dag_start_ts=to_timestamp("2023-01-01"),
394396
deployability_index=DeployabilityIndex.all_deployable(),
397+
no_gaps_snapshot_names={"a", "b"},
395398
)
396399

397400

0 commit comments

Comments
 (0)