Skip to content

Commit 38360b8

Browse files
committed
Revert plan-related triggers
1 parent b645f63 commit 38360b8

File tree

6 files changed

+5
-126
lines changed

6 files changed

+5
-126
lines changed

sqlmesh/core/console.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,10 +3830,6 @@ def update_snapshot_evaluation_progress(
38303830
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.auto_restatement_triggers)}"
38313831
if snapshot_evaluation_triggers.select_snapshot_triggers:
38323832
message += f" | select_snapshot_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.select_snapshot_triggers)}"
3833-
if snapshot_evaluation_triggers.directly_modified_triggers:
3834-
message += f" | directly_modified_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.directly_modified_triggers)}"
3835-
if snapshot_evaluation_triggers.restatement_triggers:
3836-
message += f" | restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.restatement_triggers)}"
38373833

38383834
if audit_only:
38393835
message = f"Audited {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"

sqlmesh/core/plan/builder.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def build(self) -> Plan:
293293
else DeployabilityIndex.all_deployable()
294294
)
295295

296-
restatements, restatement_triggers = self._build_restatements(
296+
restatements = self._build_restatements(
297297
dag,
298298
earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
299299
)
@@ -330,7 +330,6 @@ def build(self) -> Plan:
330330
indirectly_modified=indirectly_modified,
331331
deployability_index=deployability_index,
332332
restatements=restatements,
333-
restatement_triggers=restatement_triggers,
334333
start_override_per_model=self._start_override_per_model,
335334
end_override_per_model=end_override_per_model,
336335
selected_models_to_backfill=self._backfill_models,
@@ -353,14 +352,14 @@ def _build_dag(self) -> DAG[SnapshotId]:
353352

354353
def _build_restatements(
355354
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
356-
) -> t.Tuple[t.Dict[SnapshotId, Interval], t.Dict[SnapshotId, t.List[SnapshotId]]]:
355+
) -> t.Dict[SnapshotId, Interval]:
357356
restate_models = self._restate_models
358357
if restate_models == set():
359358
# This is a warning but we print this as error since the Console is lacking API for warnings.
360359
self._console.log_error(
361360
"Provided restated models do not match any models. No models will be included in plan."
362361
)
363-
return {}, {}
362+
return {}
364363

365364
restatements: t.Dict[SnapshotId, Interval] = {}
366365
forward_only_preview_needed = self._forward_only_preview_needed
@@ -384,7 +383,7 @@ def _build_restatements(
384383
is_preview = True
385384

386385
if not restate_models:
387-
return {}, {}
386+
return {}
388387

389388
start = self._start or earliest_interval_start
390389
end = self._end or now()
@@ -394,7 +393,6 @@ def _build_restatements(
394393
if model_fqn not in self._model_fqn_to_snapshot:
395394
raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.")
396395

397-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
398396
# Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's
399397
# restatement range that it's downstream dependencies all expand their restatement ranges as well.
400398
for s_id in dag:
@@ -430,13 +428,6 @@ def _build_restatements(
430428
logger.info("Skipping restatement for model '%s'", snapshot.name)
431429
continue
432430

433-
if snapshot.name in restate_models:
434-
restatement_triggers[s_id] = [s_id]
435-
if restating_parents:
436-
restatement_triggers[s_id] = restatement_triggers.get(s_id, []) + [
437-
s.snapshot_id for s in restating_parents
438-
]
439-
440431
possible_intervals = {
441432
restatements[p.snapshot_id] for p in restating_parents if p.is_incremental
442433
}
@@ -465,7 +456,7 @@ def _build_restatements(
465456

466457
restatements[s_id] = (snapshot_start, snapshot_end)
467458

468-
return restatements, restatement_triggers
459+
return restatements
469460

470461
def _build_directly_and_indirectly_modified(
471462
self, dag: DAG[SnapshotId]

sqlmesh/core/plan/definition.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class Plan(PydanticModel, frozen=True):
5858

5959
deployability_index: DeployabilityIndex
6060
restatements: t.Dict[SnapshotId, Interval]
61-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
6261
start_override_per_model: t.Optional[t.Dict[str, datetime]]
6362
end_override_per_model: t.Optional[t.Dict[str, datetime]]
6463

@@ -257,7 +256,6 @@ def to_evaluatable(self) -> EvaluatablePlan:
257256
skip_backfill=self.skip_backfill,
258257
empty_backfill=self.empty_backfill,
259258
restatements={s.name: i for s, i in self.restatements.items()},
260-
restatement_triggers=self.restatement_triggers,
261259
is_dev=self.is_dev,
262260
allow_destructive_models=self.allow_destructive_models,
263261
forward_only=self.forward_only,
@@ -300,7 +298,6 @@ class EvaluatablePlan(PydanticModel):
300298
skip_backfill: bool
301299
empty_backfill: bool
302300
restatements: t.Dict[str, Interval]
303-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
304301
is_dev: bool
305302
allow_destructive_models: t.Set[str]
306303
forward_only: bool

sqlmesh/core/plan/evaluator.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
SnapshotCreationFailedError,
3838
SnapshotNameVersion,
3939
)
40-
from sqlmesh.core.snapshot.definition import SnapshotEvaluationTriggers
4140
from sqlmesh.utils import to_snake_case
4241
from sqlmesh.core.state_sync import StateSync
4342
from sqlmesh.utils import CorrelationId
@@ -235,27 +234,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
235234
self.console.log_success("SKIP: No model batches to execute")
236235
return
237236

238-
directly_modified_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
239-
for parent, children in plan.indirectly_modified_snapshots.items():
240-
parent_id = stage.all_snapshots[parent].snapshot_id
241-
directly_modified_triggers[parent_id] = directly_modified_triggers.get(
242-
parent_id, []
243-
) + [parent_id]
244-
for child in children:
245-
directly_modified_triggers[child] = directly_modified_triggers.get(child, []) + [
246-
parent_id
247-
]
248-
directly_modified_triggers = {
249-
k: list(dict.fromkeys(v)) for k, v in directly_modified_triggers.items()
250-
}
251-
snapshot_evaluation_triggers = {
252-
s_id: SnapshotEvaluationTriggers(
253-
directly_modified_triggers=directly_modified_triggers.get(s_id, []),
254-
restatement_triggers=plan.restatement_triggers.get(s_id, []),
255-
)
256-
for s_id in [s.snapshot_id for s in stage.all_snapshots.values()]
257-
}
258-
259237
scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
260238
errors, _ = scheduler.run_merged_intervals(
261239
merged_intervals=stage.snapshot_to_intervals,

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ class SnapshotEvaluationTriggers(PydanticModel):
332332
cron_ready: t.Optional[bool] = None
333333
auto_restatement_triggers: t.List[SnapshotId] = []
334334
select_snapshot_triggers: t.List[SnapshotId] = []
335-
directly_modified_triggers: t.List[SnapshotId] = []
336-
restatement_triggers: t.List[SnapshotId] = []
337335

338336

339337
class SnapshotInfoMixin(ModelKindMixin):

tests/core/test_integration.py

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727

2828
from sqlmesh import CustomMaterialization
29-
import sqlmesh
3029
from sqlmesh.cli.project_init import init_example_project
3130
from sqlmesh.core import constants as c
3231
from sqlmesh.core import dialect as d
@@ -1860,27 +1859,6 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18601859
context, plan = init_and_plan_context("examples/sushi")
18611860
context.apply(plan)
18621861

1863-
# modify 3 models
1864-
# - 2 breaking changes for testing plan directly modified triggers
1865-
# - 1 adding an auto-restatement for subsequent `run` test
1866-
marketing = context.get_model("sushi.marketing")
1867-
marketing_kwargs = {
1868-
**marketing.dict(),
1869-
"query": d.parse_one(
1870-
f"{marketing.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1871-
),
1872-
}
1873-
context.upsert_model(SqlModel.parse_obj(marketing_kwargs))
1874-
1875-
customers = context.get_model("sushi.customers")
1876-
customers_kwargs = {
1877-
**customers.dict(),
1878-
"query": d.parse_one(
1879-
f"{customers.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1880-
),
1881-
}
1882-
context.upsert_model(SqlModel.parse_obj(customers_kwargs))
1883-
18841862
# add auto restatement to orders
18851863
orders = context.get_model("sushi.orders")
18861864
orders_kind = {
@@ -1893,67 +1871,8 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18931871
}
18941872
context.upsert_model(PythonModel.parse_obj(orders_kwargs))
18951873

1896-
spy = mocker.spy(sqlmesh.core.scheduler.Scheduler, "run_merged_intervals")
1897-
18981874
context.plan(auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full())
18991875

1900-
# PLAN: directly modified triggers
1901-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1902-
actual_triggers_name = {
1903-
k.name: sorted([s.name for s in v.directly_modified_triggers])
1904-
for k, v in actual_triggers.items()
1905-
if v.directly_modified_triggers
1906-
}
1907-
marketing_name = '"memory"."sushi"."marketing"'
1908-
customers_name = '"memory"."sushi"."customers"'
1909-
marketing_customers_names = sorted([marketing_name, customers_name])
1910-
children_names = [
1911-
f'"memory"."sushi"."{model}"'
1912-
for model in {
1913-
"waiter_as_customer_by_day",
1914-
"active_customers",
1915-
"count_customers_active",
1916-
"count_customers_inactive",
1917-
}
1918-
]
1919-
assert actual_triggers_name == {
1920-
marketing_name: [marketing_name],
1921-
customers_name: [customers_name],
1922-
**{k: marketing_customers_names for k in children_names},
1923-
}
1924-
1925-
# PLAN: restatement triggers
1926-
spy.reset_mock()
1927-
context.plan(
1928-
restate_models=[
1929-
'"memory"."sushi"."marketing"',
1930-
'"memory"."sushi"."order_items"',
1931-
'"memory"."sushi"."waiter_revenue_by_day"',
1932-
],
1933-
auto_apply=True,
1934-
no_prompts=True,
1935-
)
1936-
1937-
order_items_name = '"memory"."sushi"."order_items"'
1938-
waiter_revenue_by_day_name = '"memory"."sushi"."waiter_revenue_by_day"'
1939-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1940-
actual_triggers_name = {
1941-
k.name: sorted([s.name for s in v.restatement_triggers])
1942-
for k, v in actual_triggers.items()
1943-
if v.restatement_triggers
1944-
}
1945-
1946-
assert sorted(actual_triggers_name[waiter_revenue_by_day_name]) == sorted(
1947-
[waiter_revenue_by_day_name, order_items_name]
1948-
)
1949-
assert actual_triggers_name[order_items_name] == [order_items_name]
1950-
assert actual_triggers_name['"memory"."sushi"."top_waiters"'] == [waiter_revenue_by_day_name]
1951-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_by_day"'] == [order_items_name]
1952-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_lifetime"'] == [
1953-
order_items_name
1954-
]
1955-
1956-
# RUN: select and auto-restatement triggers
19571876
# User selects top_waiters and waiter_revenue_by_day, others added as auto-upstream
19581877
selected_models = {"top_waiters", "waiter_revenue_by_day"}
19591878
selected_models_auto_upstream = {"order_items", "orders", "items"}

0 commit comments

Comments
 (0)