Skip to content

Commit c95209a

Browse files
committed
use range instead of single boundary
1 parent 7a775c8 commit c95209a

File tree

6 files changed

+442
-214
lines changed

6 files changed

+442
-214
lines changed

sqlmesh/core/state_sync/base.py

Lines changed: 11 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from sqlmesh import migrations
1212
from sqlmesh.core.environment import (
1313
Environment,
14-
EnvironmentNamingInfo,
1514
EnvironmentStatements,
1615
EnvironmentSummary,
1716
)
@@ -21,17 +20,20 @@
2120
SnapshotIdLike,
2221
SnapshotIdAndVersionLike,
2322
SnapshotInfoLike,
24-
SnapshotTableCleanupTask,
25-
SnapshotTableInfo,
2623
SnapshotNameVersion,
2724
SnapshotIdAndVersion,
2825
)
2926
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
3027
from sqlmesh.utils import major_minor
3128
from sqlmesh.utils.date import TimeLike
3229
from sqlmesh.utils.errors import SQLMeshError
33-
from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator
34-
from sqlmesh.core.state_sync.common import StateStream
30+
from sqlmesh.utils.pydantic import PydanticModel, field_validator
31+
from sqlmesh.core.state_sync.common import (
32+
StateStream,
33+
ExpiredSnapshotBatch,
34+
PromotionResult,
35+
ExpiredBatchRange,
36+
)
3537

3638
logger = logging.getLogger(__name__)
3739

@@ -72,64 +74,6 @@ def _schema_version_validator(cls, v: t.Any) -> int:
7274
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
7375

7476

75-
class BatchBoundary(PydanticModel):
76-
updated_ts: int
77-
name: str
78-
identifier: str
79-
80-
def to_upper_batch_boundary(self) -> UpperBatchBoundary:
81-
return UpperBatchBoundary(
82-
updated_ts=self.updated_ts,
83-
name=self.name,
84-
identifier=self.identifier,
85-
)
86-
87-
def to_lower_batch_boundary(self, batch_size: int) -> LowerBatchBoundary:
88-
return LowerBatchBoundary(
89-
updated_ts=self.updated_ts,
90-
name=self.name,
91-
identifier=self.identifier,
92-
batch_size=batch_size,
93-
)
94-
95-
96-
class UpperBatchBoundary(BatchBoundary):
97-
@classmethod
98-
def include_all_boundary(cls) -> UpperBatchBoundary:
99-
# 9999-12-31T23:59:59.999Z in epoch milliseconds
100-
return UpperBatchBoundary(updated_ts=253_402_300_799_999, name="", identifier="")
101-
102-
103-
class LowerBatchBoundary(BatchBoundary):
104-
batch_size: int
105-
106-
@classmethod
107-
def init_batch_boundary(cls, batch_size: int) -> LowerBatchBoundary:
108-
return LowerBatchBoundary(updated_ts=0, name="", identifier="", batch_size=batch_size)
109-
110-
111-
class ExpiredSnapshotBatch(PydanticModel):
112-
"""A batch of expired snapshots to be cleaned up."""
113-
114-
expired_snapshot_ids: t.Set[SnapshotId]
115-
cleanup_tasks: t.List[SnapshotTableCleanupTask]
116-
batch_boundary: BatchBoundary
117-
118-
119-
class PromotionResult(PydanticModel):
120-
added: t.List[SnapshotTableInfo]
121-
removed: t.List[SnapshotTableInfo]
122-
removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]
123-
124-
@field_validator("removed_environment_naming_info")
125-
def _validate_removed_environment_naming_info(
126-
cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
127-
) -> t.Optional[EnvironmentNamingInfo]:
128-
if v and not info.data.get("removed"):
129-
raise ValueError("removed_environment_naming_info must be None if removed is empty")
130-
return v
131-
132-
13377
class StateReader(abc.ABC):
13478
"""Abstract base class for read-only operations on snapshot and environment state."""
13579

@@ -361,7 +305,7 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
361305
def get_expired_snapshots(
362306
self,
363307
*,
364-
batch_boundary: BatchBoundary,
308+
batch_range: ExpiredBatchRange,
365309
current_ts: t.Optional[int] = None,
366310
ignore_ttl: bool = False,
367311
) -> t.Optional[ExpiredSnapshotBatch]:
@@ -370,9 +314,7 @@ def get_expired_snapshots(
370314
Args:
371315
current_ts: Timestamp used to evaluate expiration.
372316
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
373-
batch_boundary: If provided, gets snapshot relative to the given boundary.
374-
If lower boundary then snapshots later than that will be returned (exclusive).
375-
If upper boundary then snapshots earlier than that will be returned (inclusive).
317+
batch_range: The range of the batch to fetch.
376318
377319
Returns:
378320
A batch describing expired snapshots or None if no snapshots are pending cleanup.
@@ -416,21 +358,20 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
416358
@abc.abstractmethod
417359
def delete_expired_snapshots(
418360
self,
361+
batch_range: ExpiredBatchRange,
419362
ignore_ttl: bool = False,
420363
current_ts: t.Optional[int] = None,
421-
upper_batch_boundary: t.Optional[UpperBatchBoundary] = None,
422364
) -> None:
423365
"""Removes expired snapshots.
424366
425367
Expired snapshots are snapshots that have exceeded their time-to-live
426368
and are no longer in use within an environment.
427369
428370
Args:
371+
batch_range: The range of snapshots to delete in this batch.
429372
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
430373
all snapshots that are not referenced in any environment
431374
current_ts: Timestamp used to evaluate expiration.
432-
upper_batch_boundary: The upper boundary to delete expired snapshots till (inclusive). If not provided,
433-
deletes all expired snapshots.
434375
"""
435376

436377
@abc.abstractmethod

sqlmesh/core/state_sync/cache.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
SnapshotInfoLike,
1212
)
1313
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
14-
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync, UpperBatchBoundary
14+
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
15+
from sqlmesh.core.state_sync.common import ExpiredBatchRange
1516
from sqlmesh.utils.date import TimeLike, now_timestamp
1617

1718

@@ -109,13 +110,13 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
109110

110111
def delete_expired_snapshots(
111112
self,
113+
batch_range: ExpiredBatchRange,
112114
ignore_ttl: bool = False,
113115
current_ts: t.Optional[int] = None,
114-
upper_batch_boundary: t.Optional[UpperBatchBoundary] = None,
115116
) -> None:
116117
self.snapshot_cache.clear()
117118
self.state_sync.delete_expired_snapshots(
118-
upper_batch_boundary=upper_batch_boundary,
119+
batch_range=batch_range,
119120
ignore_ttl=ignore_ttl,
120121
current_ts=current_ts,
121122
)

0 commit comments

Comments
 (0)