Skip to content

Commit de30f53

Browse files
eakmanrqtobymao
andauthored
Fix latest date (#633)
* wip - try fix latest * refactor * add test for now * more fixes * fmt * fmt --------- Co-authored-by: tobymao <toby.mao@gmail.com>
1 parent 9178187 commit de30f53

File tree

6 files changed

+108
-49
lines changed

6 files changed

+108
-49
lines changed

sqlmesh/core/plan/definition.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from sqlmesh.utils.date import (
2424
TimeLike,
2525
make_inclusive,
26+
make_inclusive_end,
2627
now,
2728
to_date,
2829
to_ds,
@@ -163,24 +164,35 @@ def set_start(self, new_start: TimeLike) -> None:
163164
self._start = new_start
164165
self.__missing_intervals = None
165166

167+
def _get_end_date(self, end_and_units: t.List[t.Tuple[int, IntervalUnit]]) -> TimeLike:
168+
if end_and_units:
169+
end, unit = max(end_and_units)
170+
171+
if unit == IntervalUnit.DAY:
172+
return to_date(make_inclusive_end(end))
173+
return end
174+
return now()
175+
166176
@property
167177
def end(self) -> TimeLike:
168178
"""Returns the end of the plan or now."""
169179
if not self._end or not self.override_end:
170180
if self._missing_intervals:
171-
end, interval_unit = max(
181+
return self._get_end_date(
172182
[
173183
(end, snapshot.model.interval_unit())
174184
for snapshot in self.snapshots
175185
if snapshot.version_get_or_generate() in self._missing_intervals
176186
for _, end in self._missing_intervals[snapshot.version_get_or_generate()]
177187
]
178188
)
179-
if interval_unit == IntervalUnit.DAY:
180-
return to_date(make_inclusive(self.start, end)[1])
181-
return end
182-
else:
183-
return scheduler.latest_end_date(self.snapshots)
189+
return self._get_end_date(
190+
[
191+
(snapshot.intervals[-1][1], snapshot.model.interval_unit())
192+
for snapshot in self.snapshots
193+
if snapshot.intervals
194+
]
195+
)
184196
return self._end
185197

186198
@end.setter

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33
import logging
44
import typing as t
5-
from datetime import date, datetime
5+
from datetime import datetime
66

77
from sqlmesh.core.console import Console, get_console
8-
from sqlmesh.core.model.meta import IntervalUnit
98
from sqlmesh.core.snapshot import (
109
Snapshot,
1110
SnapshotEvaluator,
@@ -18,9 +17,7 @@
1817
from sqlmesh.utils.dag import DAG
1918
from sqlmesh.utils.date import (
2019
TimeLike,
21-
make_inclusive,
2220
now,
23-
to_date,
2421
to_datetime,
2522
validate_date_range,
2623
yesterday,
@@ -381,27 +378,6 @@ def earliest_start_date(snapshots: t.Iterable[Snapshot]) -> datetime:
381378
return yesterday()
382379

383380

384-
def latest_end_date(snapshots: t.Iterable[Snapshot]) -> datetime | date:
385-
"""Get the latest end date from a collection of snapshots.
386-
387-
Args:
388-
snapshots: Snapshots to find latest end date.
389-
Returns:
390-
The latest end date or now if none is found.
391-
"""
392-
end_date_and_intervals = [
393-
(snapshot.intervals[-1][1], snapshot.model.interval_unit())
394-
for snapshot in snapshots
395-
if snapshot.intervals
396-
]
397-
if end_date_and_intervals:
398-
end_date, interval = max(end_date_and_intervals)
399-
if interval == IntervalUnit.DAY:
400-
return to_date(make_inclusive(end_date, end_date)[1])
401-
return to_datetime(end_date)
402-
return now()
403-
404-
405381
def _batched_intervals(params: SnapshotToBatches) -> SnapshotToBatches:
406382
batches = {}
407383

sqlmesh/core/snapshot/definition.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from sqlmesh.core.model.meta import HookCall
2323
from sqlmesh.utils.date import (
2424
TimeLike,
25-
make_inclusive,
25+
is_date,
2626
now,
2727
now_timestamp,
2828
to_datetime,
@@ -446,9 +446,10 @@ def remove_interval(self, start: TimeLike, end: TimeLike) -> None:
446446
self.dev_intervals = remove_interval(self.dev_intervals, *interval)
447447

448448
def _inclusive_exclusive(self, start: TimeLike, end: TimeLike) -> t.Tuple[int, int]:
449-
start_dt, end_dt = make_inclusive(start, end)
450-
start_ts = to_timestamp(self.model.cron_floor(start_dt))
451-
end_ts = to_timestamp(self.model.cron_next(end_dt))
449+
start_ts = to_timestamp(self.model.cron_floor(start))
450+
end_ts = to_timestamp(
451+
self.model.cron_next(end) if is_date(end) else self.model.cron_floor(end)
452+
)
452453

453454
if start_ts >= end_ts:
454455
raise ValueError("`end` must be >= `start`")
@@ -475,27 +476,31 @@ def missing_intervals(
475476
Args:
476477
start: The start date/time of the interval (inclusive)
477478
end: The end date/time of the interval (inclusive)
479+
latest: The date/time to use for latest (inclusive)
478480
479481
Returns:
480482
A list of all the missing intervals as epoch timestamps.
481483
"""
482484
if self.is_embedded_kind:
483485
return []
484486

485-
start_dt, end_dt = make_inclusive(start, self.model.cron_floor(end))
486-
487487
if self.is_full_kind or self.is_view_kind or self.is_seed_kind:
488-
latest_dt = to_datetime(self.model.cron_floor(latest or now()))
489-
latest_ts = to_timestamp(latest_dt)
488+
latest = latest or now()
489+
490+
latest_start, latest_end = self._inclusive_exclusive(
491+
latest if is_date(latest) else self.model.cron_prev(self.model.cron_floor(latest)),
492+
latest,
493+
)
490494
# if the latest ts is stored in the last interval, nothing is missing
491495
# else returns the latest ts with the exclusive end ts.
492-
if self.intervals and self.intervals[-1][1] >= latest_ts:
496+
if self.intervals and self.intervals[-1][1] >= latest_end:
493497
return []
494-
return [(to_timestamp(self.model.cron_prev(latest_dt)), latest_ts)]
498+
return [(latest_start, latest_end)]
495499

496500
missing = []
497-
dates = list(croniter_range(start_dt, end_dt, self.model.normalized_cron()))
498-
size = len(dates)
501+
start_dt, end_dt = (to_datetime(ts) for ts in self._inclusive_exclusive(start, end))
502+
dates = tuple(croniter_range(start_dt, end_dt, self.model.normalized_cron()))
503+
size = len(dates) - 1
499504

500505
for i in range(size):
501506
current_ts = to_timestamp(dates[i])

sqlmesh/utils/date.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,14 @@ def make_inclusive(start: TimeLike, end: TimeLike) -> Interval:
232232
Returns:
233233
A tuple of inclusive datetime objects.
234234
"""
235-
start_dt = to_datetime(start)
235+
return (to_datetime(start), make_inclusive_end(end))
236+
237+
238+
def make_inclusive_end(end: TimeLike) -> datetime:
236239
end_dt = to_datetime(end)
237240
if is_date(end):
238241
end_dt = end_dt + timedelta(days=1)
239-
return (start_dt, end_dt - timedelta(milliseconds=1))
242+
return end_dt - timedelta(milliseconds=1)
240243

241244

242245
def preserve_time_like_kind(input_value: TimeLike, output_value: TimeLike) -> TimeLike:

tests/core/test_plan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def test_forward_only_dev(make_snapshot, mocker: MockerFixture):
7878
yesterday_ds_mock = mocker.patch("sqlmesh.core.scheduler.yesterday")
7979
yesterday_ds_mock.return_value = expected_start
8080

81-
now_ds_mock = mocker.patch("sqlmesh.core.scheduler.now")
81+
now_ds_mock = mocker.patch("sqlmesh.core.plan.definition.now")
8282
now_ds_mock.return_value = expected_end
8383
state_reader_mock.missing_intervals.return_value = {}
8484

tests/core/test_snapshot.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from sqlmesh.core.model import (
1111
IncrementalByTimeRangeKind,
1212
Model,
13+
ModelKind,
14+
ModelKindName,
1315
SeedKind,
1416
SqlModel,
1517
create_seed_model,
@@ -51,6 +53,20 @@ def model():
5153
)
5254

5355

56+
@pytest.fixture
57+
def full_refresh_model():
58+
return SqlModel(
59+
name="fr_model",
60+
kind=ModelKind(name=ModelKindName.FULL),
61+
owner="owner",
62+
dialect="spark",
63+
cron="1 0 * * *",
64+
batch_size=30,
65+
start="2020-01-01",
66+
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
67+
)
68+
69+
5470
@pytest.fixture
5571
def snapshot(
5672
model: Model,
@@ -70,6 +86,25 @@ def snapshot(
7086
return snapshot
7187

7288

89+
@pytest.fixture
90+
def full_refresh_snapshot(
91+
full_refresh_model: Model,
92+
parent_model: Model,
93+
monkeypatch: MonkeyPatch,
94+
mocker: MockerFixture,
95+
make_snapshot,
96+
):
97+
mock = mocker.Mock()
98+
mock.return_value = to_datetime("2022-09-23T00:12:53+00:00")
99+
monkeypatch.setattr("sqlmesh.utils.date.now", mock)
100+
full_refresh_snapshot = make_snapshot(
101+
full_refresh_model,
102+
models={parent_model.name: parent_model, full_refresh_model.name: full_refresh_model},
103+
)
104+
full_refresh_snapshot.version = full_refresh_snapshot.fingerprint
105+
return full_refresh_snapshot
106+
107+
73108
def test_json(snapshot: Snapshot):
74109
assert json.loads(snapshot.json()) == {
75110
"created_ts": 1663891973000,
@@ -151,11 +186,11 @@ def test_add_interval(snapshot: Snapshot, make_snapshot):
151186
]
152187
snapshot.add_interval("2019-01-01 00:00:00", "2020-01-31 00:00:01")
153188
assert snapshot.intervals == [
154-
(to_timestamp("2019-01-01"), to_timestamp("2020-02-01")),
189+
(to_timestamp("2019-01-01"), to_timestamp("2020-01-31")),
155190
]
156-
snapshot.add_interval("2018-12-31 23:59:59", "2020-01-31 00:00:01")
191+
snapshot.add_interval("2018-12-31 23:59:59", "2020-01-31 12:00:01")
157192
assert snapshot.intervals == [
158-
(to_timestamp("2018-12-31"), to_timestamp("2020-02-01")),
193+
(to_timestamp("2018-12-31"), to_timestamp("2020-01-31")),
159194
]
160195

161196
new_snapshot = make_snapshot(snapshot.model)
@@ -207,6 +242,34 @@ def test_missing_intervals(snapshot: Snapshot):
207242
]
208243

209244

245+
def test_missing_interval_latest(
246+
full_refresh_snapshot: Snapshot, monkeypatch: MonkeyPatch, mocker: MockerFixture
247+
):
248+
mock = mocker.Mock()
249+
mock.return_value = to_datetime("2020-01-05T00:12:53+00:00")
250+
monkeypatch.setattr("sqlmesh.core.snapshot.definition.now", mock)
251+
full_refresh_snapshot.add_interval("2020-01-01", "2020-01-01")
252+
assert full_refresh_snapshot.missing_intervals("2020-01-01", "2020-01-01", "2020-01-01") == []
253+
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-02", "2020-01-02") == [
254+
(to_timestamp("2020-01-02"), to_timestamp("2020-01-03"))
255+
]
256+
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03", "2020-01-03") == [
257+
(to_timestamp("2020-01-03"), to_timestamp("2020-01-04"))
258+
]
259+
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03", "2020-01-04") == [
260+
(to_timestamp("2020-01-04"), to_timestamp("2020-01-05"))
261+
]
262+
assert full_refresh_snapshot.missing_intervals(
263+
"2020-01-02", "2020-01-03", "2020-01-03 01:00:00"
264+
) == [(to_timestamp("2020-01-02"), to_timestamp("2020-01-03"))]
265+
assert full_refresh_snapshot.missing_intervals(
266+
"2020-01-02", "2020-01-03", "2020-01-04 23:59:59"
267+
) == [(to_timestamp("2020-01-03"), to_timestamp("2020-01-04"))]
268+
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03") == [
269+
(to_timestamp("2020-01-04"), to_timestamp("2020-01-05"))
270+
]
271+
272+
210273
def test_remove_intervals(snapshot: Snapshot):
211274
snapshot.add_interval("2020-01-01", "2020-01-01")
212275
snapshot.remove_interval("2020-01-01", "2020-01-01")

0 commit comments

Comments
 (0)