Skip to content

Commit f645909

Browse files
authored
chore: add interval inclusive/exclusive method (#2543)
* chore: add interval inclusive/exclusive method * add add_snapshot_intervals method
1 parent 9a8f9ca commit f645909

File tree

4 files changed

+104
-49
lines changed

4 files changed

+104
-49
lines changed

sqlmesh/core/state_sync/base.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
SnapshotTableCleanupTask,
1919
SnapshotTableInfo,
2020
)
21-
from sqlmesh.core.snapshot.definition import Interval
21+
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
2222
from sqlmesh.utils import major_minor
2323
from sqlmesh.utils.date import TimeLike
2424
from sqlmesh.utils.errors import SQLMeshError
@@ -303,26 +303,6 @@ def invalidate_environment(self, name: str) -> None:
303303
name: The name of the environment to invalidate.
304304
"""
305305

306-
@abc.abstractmethod
307-
def add_interval(
308-
self,
309-
snapshot: Snapshot,
310-
start: TimeLike,
311-
end: TimeLike,
312-
is_dev: bool = False,
313-
) -> None:
314-
"""Add an interval to a snapshot and sync it to the store.
315-
316-
Snapshots must be pushed before adding intervals to them.
317-
318-
Args:
319-
snapshot: The snapshot like object to add an interval to.
320-
start: The start of the interval to add.
321-
end: The end of the interval to add.
322-
is_dev: Indicates whether the given interval is being added while in
323-
development mode.is_dev.
324-
"""
325-
326306
@abc.abstractmethod
327307
def remove_interval(
328308
self,
@@ -429,6 +409,42 @@ def migrate(
429409
def rollback(self) -> None:
430410
"""Rollback to previous backed up state."""
431411

412+
@abc.abstractmethod
413+
def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None:
414+
"""Add snapshot intervals to state
415+
416+
Args:
417+
snapshot_intervals: The snapshot intervals to add.
418+
"""
419+
420+
def add_interval(
421+
self,
422+
snapshot: Snapshot,
423+
start: TimeLike,
424+
end: TimeLike,
425+
is_dev: bool = False,
426+
) -> None:
427+
"""Add an interval to a snapshot and sync it to the store.
428+
429+
Args:
430+
snapshot: The snapshot like object to add an interval to.
431+
start: The start of the interval to add.
432+
end: The end of the interval to add.
433+
is_dev: Indicates whether the given interval is being added while in development mode
434+
"""
435+
start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False)
436+
if not snapshot.version:
437+
raise SQLMeshError("Snapshot version must be set to add an interval.")
438+
intervals = [(start_ts, end_ts)]
439+
snapshot_intervals = SnapshotIntervals(
440+
name=snapshot.name,
441+
identifier=snapshot.identifier,
442+
version=snapshot.version,
443+
intervals=intervals if not is_dev else [],
444+
dev_intervals=intervals if is_dev else [],
445+
)
446+
self._add_snapshot_intervals(snapshot_intervals)
447+
432448

433449
class DelegatingStateSync(StateSync):
434450
def __init__(self, state_sync: StateSync) -> None:

sqlmesh/core/state_sync/cache.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
SnapshotInfoLike,
1212
SnapshotTableCleanupTask,
1313
)
14-
from sqlmesh.core.snapshot.definition import Interval
14+
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
1515
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
1616
from sqlmesh.utils.date import TimeLike, now_timestamp
1717

@@ -129,15 +129,9 @@ def delete_expired_snapshots(self) -> t.List[SnapshotTableCleanupTask]:
129129
self.snapshot_cache.clear()
130130
return self.state_sync.delete_expired_snapshots()
131131

132-
def add_interval(
133-
self,
134-
snapshot: Snapshot,
135-
start: TimeLike,
136-
end: TimeLike,
137-
is_dev: bool = False,
138-
) -> None:
139-
self.snapshot_cache.pop(snapshot.snapshot_id, None)
140-
self.state_sync.add_interval(snapshot, start, end, is_dev)
132+
def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None:
133+
self.snapshot_cache.pop(snapshot_intervals.snapshot_id, None)
134+
self.state_sync._add_snapshot_intervals(snapshot_intervals)
141135

142136
def remove_interval(
143137
self,

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -624,27 +624,48 @@ def add_interval(
624624
end: TimeLike,
625625
is_dev: bool = False,
626626
) -> None:
627-
start_ts, end_ts = snapshot.inclusive_exclusive(start, end, strict=False)
628-
if start_ts >= end_ts:
629-
logger.info(
630-
"Skipping partial interval (%s, %s) for snapshot %s",
631-
start,
632-
end,
633-
snapshot.snapshot_id,
634-
)
635-
return
627+
super().add_interval(snapshot, start, end, is_dev)
636628

637-
logger.info(
638-
"Adding %s (%s, %s) for snapshot %s",
639-
"dev interval" if is_dev else "interval",
640-
start_ts,
641-
end_ts,
642-
snapshot.snapshot_id,
643-
)
629+
@transactional()
630+
def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None:
631+
def remove_partial_intervals(
632+
intervals: t.List[Interval], snapshot_id: SnapshotId, *, is_dev: bool
633+
) -> t.List[Interval]:
634+
results = []
635+
for start_ts, end_ts in intervals:
636+
if start_ts < end_ts:
637+
logger.info(
638+
"Adding %s (%s, %s) for snapshot %s",
639+
"dev interval" if is_dev else "interval",
640+
start_ts,
641+
end_ts,
642+
snapshot_id,
643+
)
644+
results.append((start_ts, end_ts))
645+
else:
646+
logger.info(
647+
"Skipping partial interval (%s, %s) for snapshot %s",
648+
start_ts,
649+
end_ts,
650+
snapshot_intervals.snapshot_id,
651+
)
652+
return results
644653

654+
snapshot_intervals = snapshot_intervals.copy(
655+
update={
656+
"intervals": remove_partial_intervals(
657+
snapshot_intervals.intervals, snapshot_intervals.snapshot_id, is_dev=False
658+
),
659+
"dev_intervals": remove_partial_intervals(
660+
snapshot_intervals.dev_intervals, snapshot_intervals.snapshot_id, is_dev=True
661+
),
662+
}
663+
)
664+
if not snapshot_intervals.intervals and not snapshot_intervals.dev_intervals:
665+
return
645666
self.engine_adapter.insert_append(
646667
self.intervals_table,
647-
_intervals_to_df([(snapshot, (start_ts, end_ts))], is_dev, False),
668+
_snapshot_interval_to_df(snapshot_intervals, is_removed=False),
648669
columns_to_types=self._interval_columns_to_types,
649670
)
650671

@@ -1312,6 +1333,27 @@ def _intervals_to_df(
13121333
)
13131334

13141335

1336+
def _snapshot_interval_to_df(
1337+
snapshot_intervals: SnapshotIntervals,
1338+
is_removed: bool = False,
1339+
) -> pd.DataFrame:
1340+
return pd.DataFrame(
1341+
[
1342+
_interval_to_df(
1343+
snapshot_intervals,
1344+
start_ts,
1345+
end_ts,
1346+
is_dev=is_dev,
1347+
is_removed=is_removed,
1348+
)
1349+
for is_dev in (False, True)
1350+
for start_ts, end_ts in getattr(
1351+
snapshot_intervals, "dev_intervals" if is_dev else "intervals"
1352+
)
1353+
]
1354+
)
1355+
1356+
13151357
def _interval_to_df(
13161358
snapshot: t.Union[SnapshotInfoLike, SnapshotIntervals],
13171359
start_ts: int,

sqlmesh/schedulers/airflow/state_sync.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
SnapshotInfoLike,
1313
SnapshotTableCleanupTask,
1414
)
15-
from sqlmesh.core.snapshot.definition import Interval
15+
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
1616
from sqlmesh.core.state_sync import StateSync, Versions
1717
from sqlmesh.core.state_sync.base import PromotionResult
1818
from sqlmesh.schedulers.airflow.client import AirflowClient
@@ -219,6 +219,9 @@ def add_interval(
219219
"""
220220
raise NotImplementedError("Adding intervals is not supported by the Airflow state sync.")
221221

222+
def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None:
223+
raise NotImplementedError("Adding intervals is not supported by the Airflow state sync.")
224+
222225
def remove_interval(
223226
self,
224227
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],

0 commit comments

Comments
 (0)