Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
help="Explain the plan instead of applying it.",
default=None,
)
@click.option(
"--ignore-cron",
is_flag=True,
help="Run all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
default=None,
)
@click.option(
"--min-intervals",
default=0,
Expand All @@ -543,6 +549,7 @@ def plan(
select_models = kwargs.pop("select_model") or None
allow_destructive_models = kwargs.pop("allow_destructive_model") or None
backfill_models = kwargs.pop("backfill_model") or None
ignore_cron = kwargs.pop("ignore_cron") or None
setattr(get_console(), "verbosity", Verbosity(verbose))

context.plan(
Expand All @@ -551,6 +558,7 @@ def plan(
select_models=select_models,
allow_destructive_models=allow_destructive_models,
backfill_models=backfill_models,
ignore_cron=ignore_cron,
**kwargs,
)

Expand Down
5 changes: 5 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,7 @@ def plan(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> Plan:
"""Interactively creates a plan.
Expand Down Expand Up @@ -1367,6 +1368,7 @@ def plan(
diff_rendered=diff_rendered,
skip_linter=skip_linter,
explain=explain,
ignore_cron=ignore_cron,
min_intervals=min_intervals,
)

Expand Down Expand Up @@ -1417,6 +1419,7 @@ def plan_builder(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> PlanBuilder:
"""Creates a plan builder.
Expand Down Expand Up @@ -1590,6 +1593,7 @@ def plan_builder(
max_interval_end_per_model = None
default_start, default_end = None, None
if not run:
ignore_cron = False
max_interval_end_per_model = self._get_max_interval_end_per_model(
snapshots, backfill_models
)
Expand Down Expand Up @@ -1654,6 +1658,7 @@ def plan_builder(
console=self.console,
user_provided_flags=user_provided_flags,
explain=explain or False,
ignore_cron=ignore_cron or False,
)

def apply(
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PlanBuilder:
the environment is not finalized.
start_override_per_model: A mapping of model FQNs to target start dates.
end_override_per_model: A mapping of model FQNs to target end dates.
ignore_cron: Whether to ignore the node's cron schedule when computing missing intervals.
explain: Whether to explain the plan instead of applying it.
"""

Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(
end_bounded: bool = False,
ensure_finalized_snapshots: bool = False,
explain: bool = False,
ignore_cron: bool = False,
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
console: t.Optional[PlanBuilderConsole] = None,
Expand All @@ -137,6 +139,7 @@ def __init__(
self._enable_preview = enable_preview
self._end_bounded = end_bounded
self._ensure_finalized_snapshots = ensure_finalized_snapshots
self._ignore_cron = ignore_cron
self._start_override_per_model = start_override_per_model
self._end_override_per_model = end_override_per_model
self._environment_ttl = environment_ttl
Expand Down Expand Up @@ -335,6 +338,7 @@ def build(self) -> Plan:
execution_time=plan_execution_time,
end_bounded=self._end_bounded,
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
ignore_cron=self._ignore_cron,
user_provided_flags=self._user_provided_flags,
)
self._latest_plan = plan
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Plan(PydanticModel, frozen=True):
end_bounded: bool
ensure_finalized_snapshots: bool
explain: bool
ignore_cron: bool = False

environment_ttl: t.Optional[str] = None
environment_naming_info: EnvironmentNamingInfo
Expand Down Expand Up @@ -181,6 +182,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
start_override_per_model=self.start_override_per_model,
end_override_per_model=self.end_override_per_model,
end_bounded=self.end_bounded,
ignore_cron=self.ignore_cron,
).items()
if snapshot.is_model and missing
]
Expand Down Expand Up @@ -259,6 +261,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
forward_only=self.forward_only,
end_bounded=self.end_bounded,
ensure_finalized_snapshots=self.ensure_finalized_snapshots,
ignore_cron=self.ignore_cron,
directly_modified_snapshots=sorted(self.directly_modified),
indirectly_modified_snapshots={
s.name: sorted(snapshot_ids) for s, snapshot_ids in self.indirectly_modified.items()
Expand Down Expand Up @@ -300,6 +303,7 @@ class EvaluatablePlan(PydanticModel):
forward_only: bool
end_bounded: bool
ensure_finalized_snapshots: bool
ignore_cron: bool
directly_modified_snapshots: t.List[SnapshotId]
indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
metadata_updated_snapshots: t.List[SnapshotId]
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ def _missing_intervals(
},
deployability_index=deployability_index,
end_bounded=plan.end_bounded,
ignore_cron=plan.ignore_cron,
start_override_per_model=plan.start_override_per_model,
end_override_per_model=plan.end_override_per_model,
)
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None
action="store_true",
help="Run latest intervals as part of the plan application (prod environment only).",
)
@argument(
"--ignore-cron",
action="store_true",
help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
default=None,
)
@argument(
"--enable-preview",
action="store_true",
Expand Down Expand Up @@ -533,6 +539,7 @@ def plan(self, context: Context, line: str) -> None:
select_models=args.select_model,
no_diff=args.no_diff,
run=args.run,
ignore_cron=args.run,
enable_preview=args.enable_preview,
diff_rendered=args.diff_rendered,
)
Expand Down
54 changes: 54 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,60 @@ def test_plan_with_run(
}


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_ignore_cron(
init_and_plan_context: t.Callable,
):
context, _ = init_and_plan_context("examples/sushi")

expressions = d.parse(
f"""
MODEL (
name memory.sushi.test_allow_partials,
kind INCREMENTAL_UNMANAGED,
allow_partials true,
start '2023-01-01',
);

SELECT @end_ts AS end_ts
"""
)
model = load_sql_based_model(expressions)

context.upsert_model(model)
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)

assert (
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
0
]
== "2023-01-07 23:59:59.999999"
)

plan_no_ignore_cron = context.plan_builder(
"prod", run=True, ignore_cron=False, skip_tests=True
).build()
assert not plan_no_ignore_cron.missing_intervals

plan = context.plan_builder("prod", run=True, ignore_cron=True, skip_tests=True).build()
assert plan.missing_intervals == [
SnapshotIntervals(
snapshot_id=context.get_snapshot(model, raise_if_missing=True).snapshot_id,
intervals=[
(to_timestamp("2023-01-08"), to_timestamp("2023-01-08 15:00:00")),
],
)
]
context.apply(plan)

assert (
context.engine_adapter.fetchone("SELECT MAX(end_ts) FROM memory.sushi.test_allow_partials")[
0
]
== "2023-01-08 14:59:59.999999"
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_run_with_select_models_no_auto_upstream(
init_and_plan_context: t.Callable,
Expand Down
64 changes: 64 additions & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3359,3 +3359,67 @@ def test_environment_statements_change_allows_dev_environment_creation(make_snap
assert plan is not None
assert plan.context_diff.has_environment_statements_changes
assert plan.context_diff.environment_statements == environment_statements


def test_plan_ignore_cron_flag(make_snapshot):
"""Test that ignore_cron flag is properly stored and propagated through plan objects."""

# Create a snapshot with a daily cron schedule
snapshot_a = make_snapshot(
SqlModel(
name="test_model",
kind=IncrementalByTimeRangeKind(time_column="ds"),
cron="@daily", # Daily cron schedule
start="2023-01-01",
query=parse_one("SELECT 1 as id, ds FROM VALUES ('2023-01-01') t(ds)"),
allow_partials=True,
)
)

# Mock the context diff
context_diff = ContextDiff(
environment="dev",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={},
snapshots={snapshot_a.snapshot_id: snapshot_a},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

plan_builder_ignore_cron = PlanBuilder(
context_diff,
start="2023-01-01",
execution_time="2023-01-05 12:00:00",
is_dev=True,
include_unmodified=True,
ignore_cron=True,
end_bounded=False,
)

plan = plan_builder_ignore_cron.build()
assert plan.ignore_cron is True
assert plan.to_evaluatable().ignore_cron is True

assert plan.missing_intervals == [
SnapshotIntervals(
snapshot_id=snapshot_a.snapshot_id,
intervals=[
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-05 12:00:00")),
],
)
]
Loading