Skip to content

Commit f547e9a

Browse files
authored
Merge branch 'TobikoData:main' into doris-adapter
2 parents fee745c + 119b623 commit f547e9a

32 files changed

+1055
-279
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.27.0",
27+
"sqlglot[rs]~=27.28.0",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/core/context.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@
154154
)
155155
from sqlmesh.core.snapshot import Node
156156

157+
from sqlmesh.core.snapshot.definition import Intervals
158+
157159
ModelOrSnapshot = t.Union[str, Model, Snapshot]
158160
NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot]
159161

@@ -276,6 +278,7 @@ def __init__(
276278
default_dialect: t.Optional[str] = None,
277279
default_catalog: t.Optional[str] = None,
278280
is_restatement: t.Optional[bool] = None,
281+
parent_intervals: t.Optional[Intervals] = None,
279282
variables: t.Optional[t.Dict[str, t.Any]] = None,
280283
blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
281284
):
@@ -287,6 +290,7 @@ def __init__(
287290
self._variables = variables or {}
288291
self._blueprint_variables = blueprint_variables or {}
289292
self._is_restatement = is_restatement
293+
self._parent_intervals = parent_intervals
290294

291295
@property
292296
def default_dialect(self) -> t.Optional[str]:
@@ -315,6 +319,10 @@ def gateway(self) -> t.Optional[str]:
315319
def is_restatement(self) -> t.Optional[bool]:
316320
return self._is_restatement
317321

322+
@property
323+
def parent_intervals(self) -> t.Optional[Intervals]:
324+
return self._parent_intervals
325+
318326
def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
319327
"""Returns a variable value."""
320328
return self._variables.get(var_name.lower(), default)

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,11 +551,13 @@ def replace_query(
551551
target_table,
552552
source_queries,
553553
target_columns_to_types,
554+
**kwargs,
554555
)
555556
return self._insert_overwrite_by_condition(
556557
target_table,
557558
source_queries,
558559
target_columns_to_types,
560+
**kwargs,
559561
)
560562

561563
def create_index(
@@ -1614,7 +1616,7 @@ def _insert_overwrite_by_time_partition(
16141616
**kwargs: t.Any,
16151617
) -> None:
16161618
return self._insert_overwrite_by_condition(
1617-
table_name, source_queries, target_columns_to_types, where
1619+
table_name, source_queries, target_columns_to_types, where, **kwargs
16181620
)
16191621

16201622
def _values_to_sql(

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,9 @@ def _insert_overwrite_by_condition(
423423
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
424424
**kwargs: t.Any,
425425
) -> None:
426-
if not where or where == exp.true():
426+
# note that this is passed as table_properties here rather than physical_properties
427+
use_merge_strategy = kwargs.get("table_properties", {}).get("mssql_merge_exists")
428+
if (not where or where == exp.true()) and not use_merge_strategy:
427429
# this is a full table replacement, call the base strategy to do DELETE+INSERT
428430
# which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from()
429431
return EngineAdapter._insert_overwrite_by_condition(
@@ -436,7 +438,7 @@ def _insert_overwrite_by_condition(
436438
**kwargs,
437439
)
438440

439-
# For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin
441+
# For conditional overwrites or when mssql_merge_exists is set use MERGE
440442
return super()._insert_overwrite_by_condition(
441443
table_name=table_name,
442444
source_queries=source_queries,

sqlmesh/core/lineage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def lineage(
6666
scope=scope,
6767
trim_selects=trim_selects,
6868
dialect=model.dialect,
69+
copy=False,
6970
)
7071

7172

sqlmesh/core/plan/definition.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class Plan(PydanticModel, frozen=True):
6363
restatements: t.Dict[SnapshotId, Interval]
6464
"""
6565
All models being restated, which are typically the explicitly selected ones + their downstream dependencies.
66-
66+
6767
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
6868
while :restatements is still populated with dev previews
6969
"""
@@ -213,8 +213,8 @@ def environment(self) -> Environment:
213213

214214
snapshots_by_name = self.context_diff.snapshots_by_name
215215
snapshots = [s.table_info for s in self.snapshots.values()]
216-
promoted_snapshot_ids = None
217-
if self.is_dev and not self.include_unmodified:
216+
promotable_snapshot_ids = None
217+
if self.is_dev:
218218
if self.selected_models_to_backfill is not None:
219219
# Only promote models that have been explicitly selected for backfill.
220220
promotable_snapshot_ids = {
@@ -225,12 +225,14 @@ def environment(self) -> Environment:
225225
if m in snapshots_by_name
226226
],
227227
}
228-
else:
228+
elif not self.include_unmodified:
229229
promotable_snapshot_ids = self.context_diff.promotable_snapshot_ids.copy()
230230

231-
promoted_snapshot_ids = [
232-
s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids
233-
]
231+
promoted_snapshot_ids = (
232+
[s.snapshot_id for s in snapshots if s.snapshot_id in promotable_snapshot_ids]
233+
if promotable_snapshot_ids is not None
234+
else None
235+
)
234236

235237
previous_finalized_snapshots = (
236238
self.context_diff.environment_snapshots

sqlmesh/core/scheduler.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ def batch_intervals(
352352
)
353353
for snapshot, intervals in merged_intervals.items()
354354
}
355-
snapshot_batches = {}
355+
snapshot_batches: t.Dict[Snapshot, Intervals] = {}
356356
all_unready_intervals: t.Dict[str, set[Interval]] = {}
357357
for snapshot_id in dag:
358358
if snapshot_id not in snapshot_intervals:
@@ -364,13 +364,22 @@ def batch_intervals(
364364

365365
adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway)
366366

367+
parent_intervals: Intervals = []
368+
for parent_id in snapshot.parents:
369+
parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, []))
370+
if not parent_snapshot or parent_snapshot.is_external:
371+
continue
372+
373+
parent_intervals.extend(snapshot_batches[parent_snapshot])
374+
367375
context = ExecutionContext(
368376
adapter,
369377
self.snapshots_by_name,
370378
deployability_index,
371379
default_dialect=adapter.dialect,
372380
default_catalog=self.default_catalog,
373381
is_restatement=is_restatement,
382+
parent_intervals=parent_intervals,
374383
)
375384

376385
intervals = self._check_ready_intervals(
@@ -538,6 +547,10 @@ def run_node(node: SchedulingUnit) -> None:
538547
execution_time=execution_time,
539548
)
540549
else:
550+
# If batch_index > 0, then the target table must exist since the first batch would have created it
551+
target_table_exists = (
552+
snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
553+
)
541554
audit_results = self.evaluate(
542555
snapshot=snapshot,
543556
environment_naming_info=environment_naming_info,
@@ -548,7 +561,7 @@ def run_node(node: SchedulingUnit) -> None:
548561
batch_index=node.batch_index,
549562
allow_destructive_snapshots=allow_destructive_snapshots,
550563
allow_additive_snapshots=allow_additive_snapshots,
551-
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
564+
target_table_exists=target_table_exists,
552565
selected_models=selected_models,
553566
)
554567

sqlmesh/core/signal.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44
from sqlmesh.utils import UniqueKeyDict, registry_decorator
5+
from sqlmesh.utils.errors import MissingSourceError
56

67
if t.TYPE_CHECKING:
78
from sqlmesh.core.context import ExecutionContext
@@ -42,7 +43,16 @@ class signal(registry_decorator):
4243

4344

4445
@signal()
45-
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
46+
def freshness(
47+
batch: DatetimeRanges,
48+
snapshot: Snapshot,
49+
context: ExecutionContext,
50+
) -> bool:
51+
"""
52+
Implements model freshness as a signal, i.e it considers this model to be fresh if:
53+
- Any upstream SQLMesh model has available intervals to compute i.e is fresh
54+
- Any upstream external model has been altered since the last time the model was evaluated
55+
"""
4656
adapter = context.engine_adapter
4757
if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
4858
return True
@@ -54,24 +64,35 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte
5464
if deployability_index.is_deployable(snapshot)
5565
else snapshot.dev_last_altered_ts
5666
)
67+
5768
if not last_altered_ts:
5869
return True
5970

6071
parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
61-
if len(parent_snapshots) != len(snapshot.node.depends_on) or not all(
62-
p.is_external for p in parent_snapshots
63-
):
64-
# The mismatch can happen if e.g an external model is not registered in the project
72+
73+
upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external}
74+
external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots}
75+
76+
if context.parent_intervals:
77+
# At least one upstream sqlmesh model has intervals to compute (i.e is fresh),
78+
# so the current model is considered fresh too
6579
return True
6680

67-
# Finding new data means that the upstream depedencies have been altered
68-
# since the last time the model was evaluated
69-
upstream_dep_has_new_data = any(
70-
upstream_last_altered_ts > last_altered_ts
71-
for upstream_last_altered_ts in adapter.get_table_last_modified_ts(
72-
[p.name for p in parent_snapshots]
81+
if external_parents:
82+
external_last_altered_timestamps = adapter.get_table_last_modified_ts(
83+
list(external_parents)
84+
)
85+
86+
if len(external_last_altered_timestamps) != len(external_parents):
87+
raise MissingSourceError(
88+
f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}."
89+
)
90+
91+
# Finding new data means that the upstream depedencies have been altered
92+
# since the last time the model was evaluated
93+
return any(
94+
external_last_altered_ts > last_altered_ts
95+
for external_last_altered_ts in external_last_altered_timestamps
7396
)
74-
)
7597

76-
# Returning true is a no-op, returning False nullifies the batch so the model will not be evaluated.
77-
return upstream_dep_has_new_data
98+
return False

sqlmesh/core/snapshot/definition.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,16 +2081,20 @@ def missing_intervals(
20812081
continue
20822082
snapshot_end_date = existing_interval_end
20832083

2084+
snapshot_start_date = max(
2085+
to_datetime(snapshot_start_date),
2086+
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2087+
)
2088+
if snapshot_start_date > to_datetime(snapshot_end_date):
2089+
continue
2090+
20842091
missing_interval_end_date = snapshot_end_date
20852092
node_end_date = snapshot.node.end
20862093
if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
20872094
missing_interval_end_date = node_end_date
20882095

20892096
intervals = snapshot.missing_intervals(
2090-
max(
2091-
to_datetime(snapshot_start_date),
2092-
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2093-
),
2097+
snapshot_start_date,
20942098
missing_interval_end_date,
20952099
execution_time=execution_time,
20962100
deployability_index=deployability_index,
@@ -2295,14 +2299,16 @@ def start_date(
22952299
if not isinstance(snapshots, dict):
22962300
snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
22972301

2298-
earliest = snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2299-
2300-
for parent in snapshot.parents:
2301-
if parent in snapshots:
2302-
earliest = min(
2303-
earliest,
2304-
start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to),
2305-
)
2302+
parent_starts = [
2303+
start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to)
2304+
for parent in snapshot.parents
2305+
if parent in snapshots
2306+
]
2307+
earliest = (
2308+
min(parent_starts)
2309+
if parent_starts
2310+
else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2311+
)
23062312

23072313
cache[key] = earliest
23082314
return earliest

0 commit comments

Comments
 (0)