Skip to content

Commit b2efee2

Browse files
committed
Differentiate last_altered_ts from dev_last_altered_ts
1 parent f627f57 commit b2efee2

File tree

4 files changed

+60
-23
lines changed

4 files changed

+60
-23
lines changed

sqlmesh/core/signal.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,20 @@ class signal(registry_decorator):
4242

4343
@signal()
4444
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
45+
deployability_index = context.deployability_index
4546
adapter = context.engine_adapter
46-
if not snapshot.last_altered_ts or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
47+
48+
if not deployability_index or not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
49+
return True
50+
51+
last_altered_ts = (
52+
snapshot.last_altered_ts
53+
if deployability_index.is_deployable(snapshot)
54+
else snapshot.dev_last_altered_ts
55+
)
56+
if not last_altered_ts:
4757
return True
4858

49-
adapter = context.engine_adapter
5059
parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
5160
if len(parent_snapshots) != len(snapshot.node.depends_on) or not all(
5261
p.is_external for p in parent_snapshots
@@ -57,7 +66,7 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte
5766
# Finding new data means that the upstream depedencies have been altered
5867
# since the last time the model was evaluated
5968
upstream_dep_has_new_data = any(
60-
upstream_last_altered_ts > snapshot.last_altered_ts
69+
upstream_last_altered_ts > last_altered_ts
6170
for upstream_last_altered_ts in adapter.get_external_model_freshness(
6271
[p.name for p in parent_snapshots]
6372
)

sqlmesh/core/snapshot/definition.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class SnapshotIntervals(PydanticModel):
186186
dev_intervals: Intervals = []
187187
pending_restatement_intervals: Intervals = []
188188
last_altered_ts: t.Optional[int] = None
189+
dev_last_altered_ts: t.Optional[int] = None
189190

190191
@property
191192
def snapshot_id(self) -> t.Optional[SnapshotId]:
@@ -206,6 +207,12 @@ def add_dev_interval(self, start: int, end: int) -> None:
206207
def add_pending_restatement_interval(self, start: int, end: int) -> None:
207208
self._add_interval(start, end, "pending_restatement_intervals")
208209

210+
def add_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
211+
self._add_last_altered_ts(last_altered_ts, "last_altered_ts")
212+
213+
def add_dev_last_altered_ts(self, last_altered_ts: t.Optional[int]) -> None:
214+
self._add_last_altered_ts(last_altered_ts, "dev_last_altered_ts")
215+
209216
def remove_interval(self, start: int, end: int) -> None:
210217
self._remove_interval(start, end, "intervals")
211218

@@ -225,6 +232,13 @@ def _add_interval(self, start: int, end: int, interval_attr: str) -> None:
225232
target_intervals = merge_intervals([*target_intervals, (start, end)])
226233
setattr(self, interval_attr, target_intervals)
227234

235+
def _add_last_altered_ts(
236+
self, last_altered_ts: t.Optional[int], last_altered_attr: str
237+
) -> None:
238+
if last_altered_ts:
239+
existing_last_altered_ts = getattr(self, last_altered_attr)
240+
setattr(self, last_altered_attr, max(existing_last_altered_ts or -1, last_altered_ts))
241+
228242
def _remove_interval(self, start: int, end: int, interval_attr: str) -> None:
229243
target_intervals = getattr(self, interval_attr)
230244
target_intervals = remove_interval(target_intervals, start, end)
@@ -687,6 +701,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
687701
# Physical table last modified timestamp, not to be confused with the "updated_ts" field
688702
# which is for the snapshot record itself
689703
last_altered_ts: t.Optional[int] = None
704+
dev_last_altered_ts: t.Optional[int] = None
690705

691706
@field_validator("ttl")
692707
@classmethod
@@ -726,13 +741,6 @@ def hydrate_with_intervals_by_version(
726741
for interval in snapshot_intervals:
727742
snapshot.merge_intervals(interval)
728743

729-
# Differentiate last_altered_ts between snapshots with shared version but
730-
# different dev versions e.g prod vs FORWARD_ONLY dev
731-
if snapshot.dev_version == interval.dev_version and interval.last_altered_ts:
732-
snapshot.last_altered_ts = max(
733-
snapshot.last_altered_ts or -1, interval.last_altered_ts
734-
)
735-
736744
result.append(snapshot)
737745

738746
return result
@@ -939,12 +947,20 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
939947
if not apply_effective_from or end <= effective_from_ts:
940948
self.add_interval(start, end)
941949

950+
if other.last_altered_ts:
951+
self.last_altered_ts = max(self.last_altered_ts or -1, other.last_altered_ts)
952+
942953
if self.dev_version == other.dev_version:
943954
# Merge dev intervals if the dev versions match which would mean
944955
# that this and the other snapshot are pointing to the same dev table.
945956
for start, end in other.dev_intervals:
946957
self.add_interval(start, end, is_dev=True)
947958

959+
if other.dev_last_altered_ts:
960+
self.dev_last_altered_ts = max(
961+
self.dev_last_altered_ts or -1, other.dev_last_altered_ts
962+
)
963+
948964
self.pending_restatement_intervals = merge_intervals(
949965
[*self.pending_restatement_intervals, *other.pending_restatement_intervals]
950966
)

sqlmesh/core/state_sync/db/interval.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,6 @@ def _get_snapshot_intervals(
309309
identifier=identifier,
310310
version=version,
311311
dev_version=dev_version,
312-
last_altered_ts=last_altered_ts,
313-
)
314-
315-
if last_altered_ts:
316-
intervals[merge_key].last_altered_ts = max(
317-
intervals[merge_key].last_altered_ts or 0, last_altered_ts
318312
)
319313

320314
if pending_restatement_interval_merge_key not in intervals:
@@ -337,8 +331,10 @@ def _get_snapshot_intervals(
337331
else:
338332
if is_dev:
339333
intervals[merge_key].add_dev_interval(start, end)
334+
intervals[merge_key].add_dev_last_altered_ts(last_altered_ts)
340335
else:
341336
intervals[merge_key].add_interval(start, end)
337+
intervals[merge_key].add_last_altered_ts(last_altered_ts)
342338
# Remove all pending restatement intervals recorded before the current interval has been added
343339
intervals[
344340
pending_restatement_interval_merge_key

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3835,13 +3835,24 @@ def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_p
38353835
if not adapter.SUPPORTS_EXTERNAL_MODEL_FRESHNESS:
38363836
pytest.skip("This test only runs for engines that support external model freshness")
38373837

3838-
def _assert_snapshot_last_altered_ts(context: Context, snapshot_id: str, timestamp: datetime):
3838+
def _assert_snapshot_last_altered_ts(
3839+
context: Context,
3840+
snapshot_id: str,
3841+
last_altered_ts: datetime,
3842+
dev_last_altered_ts: t.Optional[datetime] = None,
3843+
):
38393844
from sqlmesh.utils.date import to_datetime
38403845

38413846
snapshot = context.state_sync.get_snapshots([snapshot_id])[snapshot_id]
3842-
assert to_datetime(snapshot.last_altered_ts).replace(microsecond=0) == timestamp.replace(
3847+
3848+
assert to_datetime(snapshot.last_altered_ts).replace(
38433849
microsecond=0
3844-
)
3850+
) == last_altered_ts.replace(microsecond=0)
3851+
3852+
if dev_last_altered_ts:
3853+
assert to_datetime(snapshot.dev_last_altered_ts).replace(
3854+
microsecond=0
3855+
) == dev_last_altered_ts.replace(microsecond=0)
38453856

38463857
import sqlmesh
38473858

@@ -3931,14 +3942,14 @@ def _set_config(gateway: str, config: Config) -> None:
39313942
)
39323943

39333944
prod_snapshot_id = next(iter(prod_plan.context_diff.new_snapshots))
3934-
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts)
3945+
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts)
39353946

39363947
# Case 2: Model is NOT evaluated on run if external models are not fresh
39373948
_assert_model_evaluation(lambda: context.run(), was_evaluated=False, day_delta=1)
39383949

39393950
# Case 3: Differentiate last_altered_ts between snapshots with shared version
39403951
# For instance, creating a FORWARD_ONLY change in dev (reusing the version but creating a dev preview) should not cause
3941-
# the prod snapshot's last_altered_ts to be updated when fetched from the state sync
3952+
# any side effects to the prod snapshot's last_altered_ts hydration
39423953
model_path.write_text(model_path.read_text().replace("col1 * col2", "col1 + col2"))
39433954
context.load()
39443955
dev_plan_ts = now(minute_floor=False) + timedelta(days=2)
@@ -3949,8 +3960,13 @@ def _set_config(gateway: str, config: Config) -> None:
39493960

39503961
context.state_sync.clear_cache()
39513962
dev_snapshot_id = next(iter(dev_plan.context_diff.new_snapshots))
3952-
_assert_snapshot_last_altered_ts(context, dev_snapshot_id, dev_plan_ts)
3953-
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, prod_plan_ts)
3963+
_assert_snapshot_last_altered_ts(
3964+
context,
3965+
dev_snapshot_id,
3966+
last_altered_ts=prod_plan_ts,
3967+
dev_last_altered_ts=dev_plan_ts,
3968+
)
3969+
_assert_snapshot_last_altered_ts(context, prod_snapshot_id, last_altered_ts=prod_plan_ts)
39543970

39553971
# Case 4: Model is evaluated on run if any external model is fresh
39563972
adapter.execute(f"INSERT INTO {external_table2} (col2) VALUES (3)", quote_identifiers=False)

0 commit comments

Comments
 (0)