Skip to content

Commit 205a558

Browse files
committed
feat: support batching of janitor cleanup
1 parent a95955c commit 205a558

File tree

4 files changed

+151
-7
lines changed

4 files changed

+151
-7
lines changed

sqlmesh/cli/main.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,16 +631,35 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
631631
is_flag=True,
632632
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
633633
)
634+
@click.option(
635+
"--batch-start",
636+
help="The batch start datetime to start processing expired snapshots to avoid large requests.",
637+
)
638+
@click.option(
639+
"--batch-seconds",
640+
"batch_size_seconds",
641+
type=int,
642+
help="When provided with --batch-start, runs the janitor in batches by incrementing the timestamp by this many seconds until reaching the current time.",
643+
)
634644
@click.pass_context
635645
@error_handler
636646
@cli_analytics
637-
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
647+
def janitor(
648+
ctx: click.Context,
649+
ignore_ttl: bool,
650+
batch_start: t.Optional[TimeLike],
651+
batch_size_seconds: t.Optional[int],
652+
) -> None:
638653
"""
639654
Run the janitor process on-demand.
640655
641656
The janitor cleans up old environments and expired snapshots.
642657
"""
643-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
658+
ctx.obj.run_janitor(
659+
ignore_ttl,
660+
batch_start=batch_start,
661+
batch_size_seconds=batch_size_seconds,
662+
)
644663

645664

646665
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -855,12 +855,41 @@ def _has_environment_changed() -> bool:
855855
return completion_status
856856

857857
@python_api_analytics
858-
def run_janitor(self, ignore_ttl: bool) -> bool:
858+
def run_janitor(
859+
self,
860+
ignore_ttl: bool,
861+
batch_start: t.Optional[TimeLike] = None,
862+
batch_size_seconds: t.Optional[int] = None,
863+
) -> bool:
859864
success = False
860865

861866
if self.console.start_cleanup(ignore_ttl):
862867
try:
863-
self._run_janitor(ignore_ttl)
868+
if batch_size_seconds is not None:
869+
if batch_size_seconds <= 0:
870+
raise SQLMeshError("batch_size_seconds must be a positive integer.")
871+
if batch_start is None:
872+
raise SQLMeshError(
873+
"batch_start must be provided when batch_size_seconds is set."
874+
)
875+
876+
current_ts = now_timestamp()
877+
878+
if batch_start is not None:
879+
batch_start = to_timestamp(batch_start)
880+
881+
batch_delta_ms = 0 if batch_size_seconds is None else batch_size_seconds * 1000
882+
883+
next_batch_ts = batch_start if batch_start is not None else current_ts
884+
885+
while True:
886+
self._run_janitor(ignore_ttl, current_ts=next_batch_ts)
887+
888+
if next_batch_ts >= current_ts:
889+
break
890+
891+
next_batch_ts = min(next_batch_ts + batch_delta_ms, current_ts)
892+
864893
success = True
865894
finally:
866895
self.console.stop_cleanup(success=success)
@@ -2846,8 +2875,8 @@ def _destroy(self) -> bool:
28462875

28472876
return True
28482877

2849-
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2850-
current_ts = now_timestamp()
2878+
def _run_janitor(self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None) -> None:
2879+
current_ts = current_ts or now_timestamp()
28512880

28522881
# Clean up expired environments by removing their views and schemas
28532882
self._cleanup_environments(current_ts=current_ts)

tests/cli/test_cli.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from click.testing import CliRunner
1515
from sqlmesh import RuntimeEnv
1616
from sqlmesh.cli.project_init import ProjectTemplate, init_example_project
17-
from sqlmesh.cli.main import cli
17+
from sqlmesh.cli.main import cli, janitor
1818
from sqlmesh.core.context import Context
1919
from sqlmesh.integrations.dlt import generate_dlt_models
2020
from sqlmesh.utils.date import now_ds, time_like_to_str, timedelta, to_datetime, yesterday_ds
@@ -127,6 +127,40 @@ def init_prod_and_backfill(runner, temp_dir) -> None:
127127
assert path.exists(temp_dir / "db.db")
128128

129129

130+
def test_cli_janitor_batch_string(runner: CliRunner, mocker) -> None:
131+
context_mock = mocker.MagicMock()
132+
133+
result = runner.invoke(
134+
janitor,
135+
["--ignore-ttl", "--batch-start", "2025-01-01", "--batch-seconds", "60"],
136+
obj=context_mock,
137+
)
138+
139+
assert result.exit_code == 0
140+
context_mock.run_janitor.assert_called_once_with(
141+
True,
142+
batch_start="2025-01-01",
143+
batch_size_seconds=60,
144+
)
145+
146+
147+
def test_cli_janitor_batch_int(runner: CliRunner, mocker) -> None:
148+
context_mock = mocker.MagicMock()
149+
150+
result = runner.invoke(
151+
janitor,
152+
["--ignore-ttl", "--batch-start", "1735862400000", "--batch-seconds", "60"],
153+
obj=context_mock,
154+
)
155+
156+
assert result.exit_code == 0
157+
context_mock.run_janitor.assert_called_once_with(
158+
True,
159+
batch_start="1735862400000",
160+
batch_size_seconds=60,
161+
)
162+
163+
130164
def assert_duckdb_test(result) -> None:
131165
assert "Successfully Ran 1 tests against duckdb" in result.output
132166

tests/core/test_context.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,68 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
10571057
)
10581058

10591059

1060+
@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False)
1061+
def test_run_janitor_default_timestamp(sushi_context: Context, mocker: MockerFixture) -> None:
1062+
context = sushi_context
1063+
mocker.patch.object(context.console, "start_cleanup", return_value=True)
1064+
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
1065+
run_janitor = mocker.patch.object(context, "_run_janitor")
1066+
1067+
result = context.run_janitor(ignore_ttl=False)
1068+
1069+
assert result is True
1070+
run_janitor.assert_called_once_with(False, current_ts=1735862400000)
1071+
stop_cleanup.assert_called_once_with(success=True)
1072+
1073+
1074+
@time_machine.travel("2025-01-03 00:00:00 UTC", tick=False)
1075+
def test_run_janitor_batches(sushi_context: Context, mocker: MockerFixture) -> None:
1076+
context = sushi_context
1077+
mocker.patch.object(context.console, "start_cleanup", return_value=True)
1078+
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
1079+
run_janitor = mocker.patch.object(context, "_run_janitor")
1080+
1081+
result = context.run_janitor(
1082+
ignore_ttl=False,
1083+
batch_start="2 days ago",
1084+
batch_size_seconds=60 * 60 * 24, # 1 day
1085+
)
1086+
1087+
assert result is True
1088+
assert run_janitor.call_args_list == [
1089+
call(False, current_ts=1735689600000),
1090+
call(False, current_ts=1735776000000),
1091+
call(False, current_ts=1735862400000),
1092+
]
1093+
stop_cleanup.assert_called_once_with(success=True)
1094+
1095+
1096+
def test_run_janitor_batch_requires_current_ts(
1097+
sushi_context: Context, mocker: MockerFixture
1098+
) -> None:
1099+
context = sushi_context
1100+
mocker.patch.object(context.console, "start_cleanup", return_value=True)
1101+
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
1102+
1103+
with pytest.raises(SQLMeshError, match="batch_start must be provided"):
1104+
context.run_janitor(ignore_ttl=False, batch_size_seconds=60)
1105+
1106+
stop_cleanup.assert_called_once_with(success=False)
1107+
1108+
1109+
def test_run_janitor_batch_requires_positive_seconds(
1110+
sushi_context: Context, mocker: MockerFixture
1111+
) -> None:
1112+
context = sushi_context
1113+
mocker.patch.object(context.console, "start_cleanup", return_value=True)
1114+
stop_cleanup = mocker.patch.object(context.console, "stop_cleanup")
1115+
1116+
with pytest.raises(SQLMeshError, match="positive integer"):
1117+
context.run_janitor(ignore_ttl=False, batch_start=100, batch_size_seconds=0)
1118+
1119+
stop_cleanup.assert_called_once_with(success=False)
1120+
1121+
10601122
@pytest.mark.slow
10611123
def test_plan_default_end(sushi_context_pre_scheduling: Context):
10621124
prod_plan_builder = sushi_context_pre_scheduling.plan_builder("prod")

0 commit comments

Comments
 (0)