Skip to content

Commit ba33416

Browse files
authored
Feat: Add an option to not run upstream dependencies when selecting models in the command (#3413)
1 parent 1d950bb commit ba33416

File tree

4 files changed

+54
-1
lines changed

4 files changed

+54
-1
lines changed

sqlmesh/cli/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ def plan(
457457
type=int,
458458
help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.",
459459
)
460+
@click.option(
461+
"--no-auto-upstream",
462+
is_flag=True,
463+
help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
464+
)
460465
@click.pass_context
461466
@error_handler
462467
@cli_analytics

sqlmesh/core/context.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ def run(
597597
ignore_cron: bool = False,
598598
select_models: t.Optional[t.Collection[str]] = None,
599599
exit_on_env_update: t.Optional[int] = None,
600+
no_auto_upstream: bool = False,
600601
) -> bool:
601602
"""Run the entire dag through the scheduler.
602603
@@ -611,6 +612,7 @@ def run(
611612
upstream dependencies of selected models will also be evaluated.
612613
exit_on_env_update: If set, exits with the provided code if the run is interrupted by an update
613614
to the target environment.
615+
no_auto_upstream: Whether to not force upstream models to run. Only applicable when using `select_models`.
614616
615617
Returns:
616618
True if the run was successful, False otherwise.
@@ -679,6 +681,7 @@ def _has_environment_changed() -> bool:
679681
ignore_cron=ignore_cron,
680682
select_models=select_models,
681683
circuit_breaker=_has_environment_changed,
684+
no_auto_upstream=no_auto_upstream,
682685
)
683686
done = True
684687
except CircuitBreakerError:
@@ -1886,6 +1889,7 @@ def _run(
18861889
ignore_cron: bool,
18871890
select_models: t.Optional[t.Collection[str]],
18881891
circuit_breaker: t.Optional[t.Callable[[], bool]],
1892+
no_auto_upstream: bool,
18891893
) -> bool:
18901894
scheduler = self.scheduler(environment=environment)
18911895
snapshots = scheduler.snapshots
@@ -1898,7 +1902,9 @@ def _run(
18981902
for fqn, model in models.items():
18991903
dag.add(fqn, model.depends_on)
19001904
model_selector = self._new_selector(models=models, dag=dag)
1901-
select_models = set(dag.subdag(*model_selector.expand_model_selections(select_models)))
1905+
select_models = set(model_selector.expand_model_selections(select_models))
1906+
if not no_auto_upstream:
1907+
select_models = set(dag.subdag(*select_models))
19021908

19031909
return scheduler.run(
19041910
environment,

sqlmesh/magics.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,11 @@ def plan(self, context: Context, line: str) -> None:
462462
type=int,
463463
help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.",
464464
)
465+
@argument(
466+
"--no-auto-upstream",
467+
action="store_true",
468+
help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
469+
)
465470
@line_magic
466471
@pass_sqlmesh_context
467472
def run_dag(self, context: Context, line: str) -> None:
@@ -476,6 +481,7 @@ def run_dag(self, context: Context, line: str) -> None:
476481
ignore_cron=args.ignore_cron,
477482
select_models=args.select_model,
478483
exit_on_env_update=args.exit_on_env_update,
484+
no_auto_upstream=args.no_auto_upstream,
479485
)
480486
if not success:
481487
raise SQLMeshError("Error Running DAG. Check logs for details.")

tests/core/test_integration.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,42 @@ def test_run_with_select_models(
11591159
}
11601160

11611161

1162+
@freeze_time("2023-01-08 15:00:00")
1163+
def test_run_with_select_models_no_auto_upstream(
1164+
init_and_plan_context: t.Callable,
1165+
):
1166+
context, _ = init_and_plan_context("examples/sushi")
1167+
1168+
model = context.get_model("sushi.waiter_revenue_by_day")
1169+
model = SqlModel.parse_obj({**model.dict(), "audits": []})
1170+
context.upsert_model(model)
1171+
1172+
context.plan("prod", no_prompts=True, skip_tests=True, auto_apply=True)
1173+
1174+
with freeze_time("2023-01-09 00:00:00"):
1175+
assert context.run(select_models=["*waiter_revenue_by_day"], no_auto_upstream=True)
1176+
1177+
snapshots = context.state_sync.state_sync.get_snapshots(context.snapshots.values())
1178+
# Only waiter_revenue_by_day should be backfilled up to 2023-01-09.
1179+
assert {s.name: s.intervals[0][1] for s in snapshots.values() if s.intervals} == {
1180+
'"memory"."sushi"."waiter_revenue_by_day"': to_timestamp("2023-01-09"),
1181+
'"memory"."sushi"."order_items"': to_timestamp("2023-01-08"),
1182+
'"memory"."sushi"."orders"': to_timestamp("2023-01-08"),
1183+
'"memory"."sushi"."items"': to_timestamp("2023-01-08"),
1184+
'"memory"."sushi"."customer_revenue_lifetime"': to_timestamp("2023-01-08"),
1185+
'"memory"."sushi"."customer_revenue_by_day"': to_timestamp("2023-01-08"),
1186+
'"memory"."sushi"."waiter_names"': to_timestamp("2023-01-08"),
1187+
'"memory"."sushi"."raw_marketing"': to_timestamp("2023-01-08"),
1188+
'"memory"."sushi"."marketing"': to_timestamp("2023-01-08"),
1189+
'"memory"."sushi"."waiter_as_customer_by_day"': to_timestamp("2023-01-08"),
1190+
'"memory"."sushi"."top_waiters"': to_timestamp("2023-01-08"),
1191+
'"memory"."raw"."demographics"': to_timestamp("2023-01-08"),
1192+
"assert_item_price_above_zero": to_timestamp("2023-01-08"),
1193+
'"memory"."sushi"."active_customers"': to_timestamp("2023-01-08"),
1194+
'"memory"."sushi"."customers"': to_timestamp("2023-01-08"),
1195+
}
1196+
1197+
11621198
@freeze_time("2023-01-08 15:00:00")
11631199
def test_select_models(init_and_plan_context: t.Callable):
11641200
context, plan = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)