Skip to content

Commit 7a775c8

Browse files
committed
feat: batch expired snapshots
1 parent a303011 commit 7a775c8

File tree

10 files changed

+740
-137
lines changed

10 files changed

+740
-137
lines changed

docs/reference/configuration.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,10 @@ Formatting settings for the `sqlmesh format` command and UI.
125125

126126
Configuration for the `sqlmesh janitor` command.
127127

128-
| Option | Description | Type | Required |
129-
|--------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130-
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
128+
| Option | Description | Type | Required |
129+
|---------------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130+
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
131+
| `expired_snapshots_batch_size` | Maximum number of expired snapshots to clean in a single batch (Default: 200) | int | N |
131132

132133

133134
## UI

sqlmesh/core/config/janitor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
from __future__ import annotations
22

3+
import typing as t
34

45
from sqlmesh.core.config.base import BaseConfig
6+
from sqlmesh.utils.pydantic import field_validator
57

68

79
class JanitorConfig(BaseConfig):
810
"""The configuration for the janitor.
911
1012
Args:
1113
warn_on_delete_failure: Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views.
14+
expired_snapshots_batch_size: Maximum number of expired snapshots to clean in a single batch.
1215
"""
1316

1417
warn_on_delete_failure: bool = False
18+
expired_snapshots_batch_size: t.Optional[int] = None
19+
20+
@field_validator("expired_snapshots_batch_size", mode="before")
21+
@classmethod
22+
def _validate_batch_size(cls, value: int) -> int:
23+
batch_size = int(value)
24+
if batch_size <= 0:
25+
raise ValueError("expired_snapshots_batch_size must be greater than 0")
26+
return batch_size

sqlmesh/core/context.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
StateSync,
110110
cleanup_expired_views,
111111
)
112+
from sqlmesh.core.state_sync.common import delete_expired_snapshots
112113
from sqlmesh.core.table_diff import TableDiff
113114
from sqlmesh.core.test import (
114115
ModelTextTestResult,
@@ -2852,19 +2853,14 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
28522853
# Clean up expired environments by removing their views and schemas
28532854
self._cleanup_environments(current_ts=current_ts)
28542855

2855-
cleanup_targets = self.state_sync.get_expired_snapshots(
2856-
ignore_ttl=ignore_ttl, current_ts=current_ts
2857-
)
2858-
2859-
# Remove the expired snapshots tables
2860-
self.snapshot_evaluator.cleanup(
2861-
target_snapshots=cleanup_targets,
2862-
on_complete=self.console.update_cleanup_progress,
2856+
delete_expired_snapshots(
2857+
self.state_sync,
2858+
self.snapshot_evaluator,
2859+
current_ts=current_ts,
2860+
ignore_ttl=ignore_ttl,
2861+
console=self.console,
2862+
batch_size=self.config.janitor.expired_snapshots_batch_size,
28632863
)
2864-
2865-
# Delete the expired snapshot records from the state sync
2866-
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)
2867-
28682864
self.state_sync.compact_intervals()
28692865

28702866
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,50 @@ def _schema_version_validator(cls, v: t.Any) -> int:
7272
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
7373

7474

75+
class BatchBoundary(PydanticModel):
76+
updated_ts: int
77+
name: str
78+
identifier: str
79+
80+
def to_upper_batch_boundary(self) -> UpperBatchBoundary:
81+
return UpperBatchBoundary(
82+
updated_ts=self.updated_ts,
83+
name=self.name,
84+
identifier=self.identifier,
85+
)
86+
87+
def to_lower_batch_boundary(self, batch_size: int) -> LowerBatchBoundary:
88+
return LowerBatchBoundary(
89+
updated_ts=self.updated_ts,
90+
name=self.name,
91+
identifier=self.identifier,
92+
batch_size=batch_size,
93+
)
94+
95+
96+
class UpperBatchBoundary(BatchBoundary):
97+
@classmethod
98+
def include_all_boundary(cls) -> UpperBatchBoundary:
99+
# 9999-12-31T23:59:59.999Z in epoch milliseconds
100+
return UpperBatchBoundary(updated_ts=253_402_300_799_999, name="", identifier="")
101+
102+
103+
class LowerBatchBoundary(BatchBoundary):
104+
batch_size: int
105+
106+
@classmethod
107+
def init_batch_boundary(cls, batch_size: int) -> LowerBatchBoundary:
108+
return LowerBatchBoundary(updated_ts=0, name="", identifier="", batch_size=batch_size)
109+
110+
111+
class ExpiredSnapshotBatch(PydanticModel):
112+
"""A batch of expired snapshots to be cleaned up."""
113+
114+
expired_snapshot_ids: t.Set[SnapshotId]
115+
cleanup_tasks: t.List[SnapshotTableCleanupTask]
116+
batch_boundary: BatchBoundary
117+
118+
75119
class PromotionResult(PydanticModel):
76120
added: t.List[SnapshotTableInfo]
77121
removed: t.List[SnapshotTableInfo]
@@ -315,15 +359,23 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
315359

316360
@abc.abstractmethod
317361
def get_expired_snapshots(
318-
self, current_ts: t.Optional[int] = None, ignore_ttl: bool = False
319-
) -> t.List[SnapshotTableCleanupTask]:
320-
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
362+
self,
363+
*,
364+
batch_boundary: BatchBoundary,
365+
current_ts: t.Optional[int] = None,
366+
ignore_ttl: bool = False,
367+
) -> t.Optional[ExpiredSnapshotBatch]:
368+
"""Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
321369
322-
Expired snapshots are snapshots that have exceeded their time-to-live
323-
and are no longer in use within an environment.
370+
Args:
371+
current_ts: Timestamp used to evaluate expiration.
372+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
373+
batch_boundary: If provided, gets snapshot relative to the given boundary.
374+
If lower boundary then snapshots later than that will be returned (exclusive).
375+
If upper boundary then snapshots earlier than that will be returned (inclusive).
324376
325377
Returns:
326-
The list of table cleanup tasks.
378+
A batch describing expired snapshots or None if no snapshots are pending cleanup.
327379
"""
328380

329381
@abc.abstractmethod
@@ -363,7 +415,10 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
363415

364416
@abc.abstractmethod
365417
def delete_expired_snapshots(
366-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
418+
self,
419+
ignore_ttl: bool = False,
420+
current_ts: t.Optional[int] = None,
421+
upper_batch_boundary: t.Optional[UpperBatchBoundary] = None,
367422
) -> None:
368423
"""Removes expired snapshots.
369424
@@ -373,6 +428,9 @@ def delete_expired_snapshots(
373428
Args:
374429
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
375430
all snapshots that are not referenced in any environment
431+
current_ts: Timestamp used to evaluate expiration.
432+
upper_batch_boundary: The upper boundary to delete expired snapshots till (inclusive). If not provided,
433+
deletes all expired snapshots.
376434
"""
377435

378436
@abc.abstractmethod

sqlmesh/core/state_sync/cache.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
SnapshotInfoLike,
1212
)
1313
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
14-
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
14+
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync, UpperBatchBoundary
1515
from sqlmesh.utils.date import TimeLike, now_timestamp
1616

1717

@@ -108,11 +108,17 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
108108
self.state_sync.delete_snapshots(snapshot_ids)
109109

110110
def delete_expired_snapshots(
111-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
111+
self,
112+
ignore_ttl: bool = False,
113+
current_ts: t.Optional[int] = None,
114+
upper_batch_boundary: t.Optional[UpperBatchBoundary] = None,
112115
) -> None:
113-
current_ts = current_ts or now_timestamp()
114116
self.snapshot_cache.clear()
115-
self.state_sync.delete_expired_snapshots(current_ts=current_ts, ignore_ttl=ignore_ttl)
117+
self.state_sync.delete_expired_snapshots(
118+
upper_batch_boundary=upper_batch_boundary,
119+
ignore_ttl=ignore_ttl,
120+
current_ts=current_ts,
121+
)
116122

117123
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
118124
for snapshot_intervals in snapshots_intervals:

sqlmesh/core/state_sync/common.py

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
from sqlmesh.utils.pydantic import PydanticModel
1515
from sqlmesh.core.environment import Environment, EnvironmentStatements
1616
from sqlmesh.utils.errors import SQLMeshError
17-
from sqlmesh.core.snapshot import Snapshot
17+
from sqlmesh.core.snapshot import Snapshot, SnapshotEvaluator
1818

1919
if t.TYPE_CHECKING:
2020
from sqlmesh.core.engine_adapter.base import EngineAdapter
21-
from sqlmesh.core.state_sync.base import Versions
21+
from sqlmesh.core.state_sync.base import Versions, ExpiredSnapshotBatch, StateReader, StateSync
2222

2323
logger = logging.getLogger(__name__)
2424

25+
EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200
26+
2527

2628
def cleanup_expired_views(
2729
default_adapter: EngineAdapter,
@@ -215,3 +217,88 @@ def __iter__(self) -> t.Iterator[StateStreamContents]:
215217
yield EnvironmentsChunk(environments)
216218

217219
return _StateStream()
220+
221+
222+
def iter_expired_snapshot_batches(
223+
state_reader: StateReader,
224+
*,
225+
current_ts: int,
226+
ignore_ttl: bool = False,
227+
batch_size: t.Optional[int] = None,
228+
) -> t.Iterator[ExpiredSnapshotBatch]:
229+
"""Yields expired snapshot batches.
230+
231+
Args:
232+
state_reader: StateReader instance to query expired snapshots from.
233+
current_ts: Timestamp used to evaluate expiration.
234+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
235+
batch_size: Maximum number of snapshots to fetch per batch.
236+
"""
237+
from sqlmesh.core.state_sync.base import LowerBatchBoundary
238+
239+
batch_size = batch_size if batch_size is not None else EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE
240+
batch_boundary = LowerBatchBoundary.init_batch_boundary(batch_size=batch_size)
241+
242+
while True:
243+
batch = state_reader.get_expired_snapshots(
244+
current_ts=current_ts,
245+
ignore_ttl=ignore_ttl,
246+
batch_boundary=batch_boundary,
247+
)
248+
249+
if batch is None:
250+
return
251+
252+
yield batch
253+
254+
batch_boundary = batch.batch_boundary.to_lower_batch_boundary(batch_size=batch_size)
255+
256+
257+
def delete_expired_snapshots(
258+
state_sync: StateSync,
259+
snapshot_evaluator: SnapshotEvaluator,
260+
*,
261+
current_ts: int,
262+
ignore_ttl: bool = False,
263+
batch_size: t.Optional[int] = None,
264+
console: t.Optional[Console] = None,
265+
) -> None:
266+
"""Delete all expired snapshots in batches.
267+
268+
This helper function encapsulates the logic for deleting expired snapshots in batches,
269+
eliminating code duplication across different use cases.
270+
271+
Args:
272+
state_sync: StateSync instance to query and delete expired snapshots from.
273+
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
274+
current_ts: Timestamp used to evaluate expiration.
275+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
276+
batch_size: Maximum number of snapshots to fetch per batch.
277+
console: Optional console for reporting progress.
278+
279+
Returns:
280+
The total number of deleted expired snapshots.
281+
"""
282+
num_expired_snapshots = 0
283+
for batch in iter_expired_snapshot_batches(
284+
state_reader=state_sync,
285+
current_ts=current_ts,
286+
ignore_ttl=ignore_ttl,
287+
batch_size=batch_size,
288+
):
289+
logger.info(
290+
"Processing batch of size %s and max_updated_ts of %s",
291+
len(batch.expired_snapshot_ids),
292+
batch.batch_boundary.updated_ts,
293+
)
294+
snapshot_evaluator.cleanup(
295+
target_snapshots=batch.cleanup_tasks,
296+
on_complete=console.update_cleanup_progress if console else None,
297+
)
298+
state_sync.delete_expired_snapshots(
299+
upper_batch_boundary=batch.batch_boundary.to_upper_batch_boundary(),
300+
ignore_ttl=ignore_ttl,
301+
)
302+
logger.info("Cleaned up expired snapshots batch")
303+
num_expired_snapshots += len(batch.expired_snapshot_ids)
304+
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)

sqlmesh/core/state_sync/db/facade.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,19 @@
3535
SnapshotInfoLike,
3636
SnapshotIntervals,
3737
SnapshotNameVersion,
38-
SnapshotTableCleanupTask,
3938
SnapshotTableInfo,
4039
start_date,
4140
)
4241
from sqlmesh.core.snapshot.definition import (
4342
Interval,
4443
)
4544
from sqlmesh.core.state_sync.base import (
45+
ExpiredSnapshotBatch,
4646
PromotionResult,
4747
StateSync,
4848
Versions,
49+
BatchBoundary,
50+
UpperBatchBoundary,
4951
)
5052
from sqlmesh.core.state_sync.common import (
5153
EnvironmentsChunk,
@@ -261,26 +263,39 @@ def invalidate_environment(self, name: str, protect_prod: bool = True) -> None:
261263
self.environment_state.invalidate_environment(name, protect_prod)
262264

263265
def get_expired_snapshots(
264-
self, current_ts: t.Optional[int] = None, ignore_ttl: bool = False
265-
) -> t.List[SnapshotTableCleanupTask]:
266+
self,
267+
*,
268+
batch_boundary: BatchBoundary,
269+
current_ts: t.Optional[int] = None,
270+
ignore_ttl: bool = False,
271+
) -> t.Optional[ExpiredSnapshotBatch]:
266272
current_ts = current_ts or now_timestamp()
267273
return self.snapshot_state.get_expired_snapshots(
268-
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
274+
environments=self.environment_state.get_environments(),
275+
current_ts=current_ts,
276+
ignore_ttl=ignore_ttl,
277+
batch_boundary=batch_boundary,
269278
)
270279

271280
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
272281
return self.environment_state.get_expired_environments(current_ts=current_ts)
273282

274283
@transactional()
275284
def delete_expired_snapshots(
276-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
285+
self,
286+
ignore_ttl: bool = False,
287+
current_ts: t.Optional[int] = None,
288+
upper_batch_boundary: t.Optional[UpperBatchBoundary] = None,
277289
) -> None:
278-
current_ts = current_ts or now_timestamp()
279-
for expired_snapshot_ids, cleanup_targets in self.snapshot_state._get_expired_snapshots(
280-
self.environment_state.get_environments(), ignore_ttl=ignore_ttl, current_ts=current_ts
281-
):
282-
self.snapshot_state.delete_snapshots(expired_snapshot_ids)
283-
self.interval_state.cleanup_intervals(cleanup_targets, expired_snapshot_ids)
290+
upper_batch_boundary = upper_batch_boundary or UpperBatchBoundary.include_all_boundary()
291+
batch = self.get_expired_snapshots(
292+
ignore_ttl=ignore_ttl,
293+
current_ts=current_ts,
294+
batch_boundary=upper_batch_boundary,
295+
)
296+
if batch and batch.expired_snapshot_ids:
297+
self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids)
298+
self.interval_state.cleanup_intervals(batch.cleanup_tasks, batch.expired_snapshot_ids)
284299

285300
@transactional()
286301
def delete_expired_environments(

0 commit comments

Comments
 (0)