Skip to content

Commit 6797d06

Browse files
committed
Differentiate last_altered_ts from dev_last_altered_ts
1 parent 7982c31 commit 6797d06

File tree

4 files changed

+60
-23
lines changed

4 files changed

+60
-23
lines changed

sqlmesh/core/signal.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,20 @@ class signal(registry_decorator):
4242

4343
@signal()
4444
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
45+
deployability_index = context.deployability_index
4546
adapter = context.engine_adapter
46-
if not snapshot.last_altered_ts or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
47+
48+
if not deployability_index or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
49+
return True
50+
51+
last_altered_ts = (
52+
snapshot.last_altered_ts
53+
if deployability_index.is_deployable(snapshot)
54+
else snapshot.dev_last_altered_ts
55+
)
56+
if not last_altered_ts:
4757
return True
4858

49-
adapter = context.engine_adapter
5059
parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
5160
if len(parent_snapshots) != len(snapshot.node.depends_on) or not all(
5261
p.is_external for p in parent_snapshots
@@ -57,7 +66,7 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte
5766
# Finding new data means that the upstream depedencies have been altered
5867
# since the last time the model was evaluated
5968
upstream_dep_has_new_data = any(
60-
upstream_last_altered_ts > snapshot.last_altered_ts
69+
upstream_last_altered_ts > last_altered_ts
6170
for upstream_last_altered_ts in adapter.get_external_model_freshness(
6271
[p.name for p in parent_snapshots]
6372
)

sqlmesh/core/snapshot/definition.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class SnapshotIntervals(PydanticModel):
186186
dev_intervals: Intervals = []
187187
pending_restatement_intervals: Intervals = []
188188
last_altered_ts: t.Optional[int] = None
189+
dev_last_altered_ts: t.Optional[int] = None
189190

190191
@property
191192
def snapshot_id(self) -> t.Optional[SnapshotId]:
@@ -206,6 +207,12 @@ def add_dev_interval(self, start: int, end: int) -> None:
206207
def add_pending_restatement_interval(self, start: int, end: int) -> None:
207208
self._add_interval(start, end, "pending_restatement_intervals")
208209

210+
def add_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
211+
self._add_last_altered_ts(last_altered_ts, "last_altered_ts")
212+
213+
def add_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
214+
self._add_last_altered_ts(last_altered_ts, "dev_last_altered_ts")
215+
209216
def remove_interval(self, start: int, end: int) -> None:
210217
self._remove_interval(start, end, "intervals")
211218

@@ -225,6 +232,13 @@ def _add_interval(self, start: int, end: int, interval_attr: str) -> None:
225232
target_intervals = merge_intervals([*target_intervals, (start, end)])
226233
setattr(self, interval_attr, target_intervals)
227234

235+
def _add_last_altered_ts(
236+
self, last_altered_ts: t.Optional[int], last_altered_attr: str
237+
) -> None:
238+
if last_altered_ts:
239+
existing_last_altered_ts = getattr(self, last_altered_attr)
240+
setattr(self, last_altered_attr, max(existing_last_altered_ts or -1, last_altered_ts))
241+
228242
def _remove_interval(self, start: int, end: int, interval_attr: str) -> None:
229243
target_intervals = getattr(self, interval_attr)
230244
target_intervals = remove_interval(target_intervals, start, end)
@@ -717,6 +731,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
717731
# Physical table last modified timestamp, not to be confused with the "updated_ts" field
718732
# which is for the snapshot record itself
719733
last_altered_ts: t.Optional[int] = None
734+
dev_last_altered_ts: t.Optional[int] = None
720735

721736
@field_validator("ttl")
722737
@classmethod
@@ -756,13 +771,6 @@ def hydrate_with_intervals_by_version(
756771
for interval in snapshot_intervals:
757772
snapshot.merge_intervals(interval)
758773

759-
# Differentiate last_altered_ts between snapshots with shared version but
760-
# different dev versions e.g prod vs FORWARD_ONLY dev
761-
if snapshot.dev_version == interval.dev_version and interval.last_altered_ts:
762-
snapshot.last_altered_ts = max(
763-
snapshot.last_altered_ts or -1, interval.last_altered_ts
764-
)
765-
766774
result.append(snapshot)
767775

768776
return result
@@ -969,12 +977,20 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
969977
if not apply_effective_from or end <= effective_from_ts:
970978
self.add_interval(start, end)
971979

980+
if other.last_altered_ts:
981+
self.last_altered_ts = max(self.last_altered_ts or -1, other.last_altered_ts)
982+
972983
if self.dev_version == other.dev_version:
973984
# Merge dev intervals if the dev versions match which would mean
974985
# that this and the other snapshot are pointing to the same dev table.
975986
for start, end in other.dev_intervals:
976987
self.add_interval(start, end, is_dev=True)
977988

989+
if other.dev_last_altered_ts:
990+
self.dev_last_altered_ts = max(
991+
self.dev_last_altered_ts or -1, other.dev_last_altered_ts
992+
)
993+
978994
self.pending_restatement_intervals = merge_intervals(
979995
[*self.pending_restatement_intervals, *other.pending_restatement_intervals]
980996
)

sqlmesh/core/state_sync/db/interval.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,6 @@ def _get_snapshot_intervals(
309309
identifier=identifier,
310310
version=version,
311311
dev_version=dev_version,
312-
last_altered_ts=last_altered_ts,
313-
)
314-
315-
if last_altered_ts:
316-
intervals[merge_key].last_altered_ts = max(
317-
intervals[merge_key].last_altered_ts or 0, last_altered_ts
318312
)
319313

320314
if pending_restatement_interval_merge_key not in intervals:
@@ -337,8 +331,10 @@ def _get_snapshot_intervals(
337331
else:
338332
if is_dev:
339333
intervals[merge_key].add_dev_interval(start, end)
334+
intervals[merge_key].add_dev_last_altered_ts(last_altered_ts)
340335
else:
341336
intervals[merge_key].add_interval(start, end)
337+
intervals[merge_key].add_last_altered_ts(last_altered_ts)
342338
# Remove all pending restatement intervals recorded before the current interval has been added
343339
intervals[
344340
pending_restatement_interval_merge_key

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3848,13 +3848,24 @@ def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_p
38483848
if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
38493849
pytest.skip("This test only runs for engines that support external model freshness")
38503850

3851-
def _assert_snapshot_last_altered_ts(context: Context, snapshot_id: str, timestamp: datetime):
3851+
def _assert_snapshot_last_altered_ts(
3852+
context: Context,
3853+
snapshot_id: str,
3854+
last_altered_ts: datetime,
3855+
dev_last_altered_ts: t.Optional[datetime] = None,
3856+
):
38523857
from sqlmesh.utils.date import to_datetime
38533858

38543859
snapshot = context.state_sync.get_snapshots([snapshot_id])[snapshot_id]
3855-
assert to_datetime(snapshot.last_altered_ts).replace(microsecond=0) == timestamp.replace(
3860+
3861+
assert to_datetime(snapshot.last_altered_ts).replace(
38563862
microsecond=0
3857-
)
3863+
) == last_altered_ts.replace(microsecond=0)
3864+
3865+
if dev_last_altered_ts:
3866+
assert to_datetime(snapshot.dev_last_altered_ts).replace(
3867+
microsecond=0
3868+
) == dev_last_altered_ts.replace(microsecond=0)
38583869

38593870
import sqlmesh
38603871

@@ -3944,14 +3955,14 @@ def _set_config(gateway: str, config: Config) -> None:
39443955
)
39453956

39463957
prod_snapshot_id = next(iter(prod_plan.context_diff.new_snapshots))
3947-
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts)
3958+
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts)
39483959

39493960
# Case 2: Model is NOT evaluated on run if external models are not fresh
39503961
_assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=1)
39513962

39523963
# Case 3: Differentiate last_altered_ts between snapshots with shared version
39533964
# For instance, creating a FORWARD_ONLY change in dev (reusing the version but creating a dev preview) should not cause
3954-
# the prod snapshot's last_altered_ts to be updated when fetched from the state sync
3965+
# any side effects to the prod snapshot's last_altered_ts hydration
39553966
model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2"))
39563967
context.load()
39573968
dev_plan_ts = now(minute_floor=False) + timedelta(days=2)
@@ -3962,8 +3973,13 @@ def _set_config(gateway: str, config: Config) -> None:
39623973

39633974
context.state_sync.clear_cache()
39643975
dev_snapshot_id = next(iter(dev_plan.context_diff.new_snapshots))
3965-
_assert_snapshot_last_altered_ts(context, dev_snapshot_id, dev_plan_ts)
3966-
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts)
3976+
_assert_snapshot_last_altered_ts(
3977+
context,
3978+
dev_snapshot_id,
3979+
last_altered_ts=prod_plan_ts,
3980+
dev_last_altered_ts=dev_plan_ts,
3981+
)
3982+
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts)
39673983

39683984
# Case 4: Model is evaluated on run if any external model is fresh
39693985
adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False)

0 commit comments

Comments
 (0)