Skip to content

Commit b1286df

Browse files
authored
Merge branch 'main' into indirect_change_to_materialized_view_is_breaking
2 parents ea775fb + 50aee2c commit b1286df

25 files changed

+633
-49
lines changed

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.14.0",
27+
"sqlglot[rs]~=27.16.3",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"
@@ -56,7 +56,7 @@ dev = [
5656
"agate",
5757
"beautifulsoup4",
5858
"clickhouse-connect",
59-
"cryptography",
59+
"cryptography<46.0.0",
6060
"databricks-sql-connector",
6161
"dbt-bigquery",
6262
"dbt-core",
@@ -119,7 +119,7 @@ postgres = ["psycopg2"]
119119
redshift = ["redshift_connector"]
120120
slack = ["slack_sdk"]
121121
snowflake = [
122-
"cryptography",
122+
"cryptography<46.0.0",
123123
"snowflake-connector-python[pandas,secure-local-storage]",
124124
"snowflake-snowpark-python",
125125
]
@@ -135,7 +135,7 @@ lsp = [
135135
# Duplicate of web
136136
"fastapi==0.115.5",
137137
"watchfiles>=0.19.0",
138-
"uvicorn[standard]==0.22.0",
138+
# "uvicorn[standard]==0.22.0",
139139
"sse-starlette>=0.2.2",
140140
"pyarrow",
141141
# For lsp

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,7 @@ def clone_table(
10441044
target_table_name: TableName,
10451045
source_table_name: TableName,
10461046
replace: bool = False,
1047+
exists: bool = True,
10471048
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
10481049
**kwargs: t.Any,
10491050
) -> None:
@@ -1053,6 +1054,7 @@ def clone_table(
10531054
target_table_name: The name of the table that should be created.
10541055
source_table_name: The name of the source table that should be cloned.
10551056
replace: Whether or not to replace an existing table.
1057+
exists: Indicates whether to include the IF NOT EXISTS check.
10561058
"""
10571059
if not self.SUPPORTS_CLONING:
10581060
raise NotImplementedError(f"Engine does not support cloning: {type(self)}")
@@ -1063,6 +1065,7 @@ def clone_table(
10631065
this=exp.to_table(target_table_name),
10641066
kind="TABLE",
10651067
replace=replace,
1068+
exists=exists,
10661069
clone=exp.Clone(
10671070
this=exp.to_table(source_table_name),
10681071
**(clone_kwargs or {}),

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ def clone_table(
299299
target_table_name: TableName,
300300
source_table_name: TableName,
301301
replace: bool = False,
302+
exists: bool = True,
302303
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
303304
**kwargs: t.Any,
304305
) -> None:

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ def clone_table(
610610
target_table_name: TableName,
611611
source_table_name: TableName,
612612
replace: bool = False,
613+
exists: bool = True,
613614
clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
614615
**kwargs: t.Any,
615616
) -> None:

sqlmesh/core/model/definition.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2866,6 +2866,13 @@ def render_field_value(value: t.Any) -> t.Any:
28662866
for key, value in field_value.items():
28672867
if key in RUNTIME_RENDERED_MODEL_FIELDS:
28682868
rendered_dict[key] = parse_strings_with_macro_refs(value, dialect)
2869+
elif (
2870+
# don't parse kind auto_restatement_cron="@..." kwargs (e.g. @daily) into MacroVar
2871+
key == "auto_restatement_cron"
2872+
and isinstance(value, str)
2873+
and value.lower() in CRON_SHORTCUTS
2874+
):
2875+
rendered_dict[key] = value
28692876
elif (rendered := render_field_value(value)) is not None:
28702877
rendered_dict[key] = rendered
28712878

sqlmesh/core/plan/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
1616
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
1717
# View models always need to be rebuilt to reflect updated upstream dependencies
1818
return True
19-
if new.is_seed:
19+
if new.is_seed and not new.is_metadata:
2020
# Seed models always need to be rebuilt to reflect changes in the seed file
21+
# Unless only their metadata has been updated (eg description added) and the seed file has not been touched
2122
return True
2223
return is_breaking_kind_change(old, new)
2324

sqlmesh/core/renderer.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from functools import partial
77
from pathlib import Path
88

9-
from sqlglot import exp, parse
9+
from sqlglot import exp, Dialect
1010
from sqlglot.errors import SqlglotError
1111
from sqlglot.helper import ensure_list
1212
from sqlglot.optimizer.annotate_types import annotate_types
@@ -249,15 +249,24 @@ def _resolve_table(table: str | exp.Table) -> str:
249249
) from ex
250250

251251
if rendered_expression.strip():
252-
try:
253-
expressions = [e for e in parse(rendered_expression, read=self._dialect) if e]
254-
255-
if not expressions:
256-
raise ConfigError(f"Failed to parse an expression:\n{self._expression}")
257-
except Exception as ex:
258-
raise ConfigError(
259-
f"Could not parse the rendered jinja at '{self._path}'.\n{ex}"
260-
) from ex
252+
# ensure there is actual SQL and not just comments and non-SQL jinja
253+
dialect = Dialect.get_or_raise(self._dialect)
254+
tokens = dialect.tokenize(rendered_expression)
255+
256+
if tokens:
257+
try:
258+
expressions = [
259+
e for e in dialect.parser().parse(tokens, rendered_expression) if e
260+
]
261+
262+
if not expressions:
263+
raise ConfigError(
264+
f"Failed to parse an expression:\n{rendered_expression}"
265+
)
266+
except Exception as ex:
267+
raise ConfigError(
268+
f"Could not parse the rendered jinja at '{self._path}'.\n{ex}"
269+
) from ex
261270

262271
if this_model:
263272
render_kwargs["this_model"] = this_model

sqlmesh/core/snapshot/definition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1480,7 +1480,7 @@ def expiration_ts(self) -> int:
14801480
@property
14811481
def supports_schema_migration_in_prod(self) -> bool:
14821482
"""Returns whether or not this snapshot supports schema migration when deployed to production."""
1483-
return self.is_paused and self.is_model and not self.is_symbolic
1483+
return self.is_paused and self.is_model and not self.is_symbolic and not self.is_seed
14841484

14851485
@property
14861486
def requires_schema_migration_in_prod(self) -> bool:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from sqlglot import exp, select
3434
from sqlglot.executor import execute
35+
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_not_exception_type
3536

3637
from sqlmesh.core import constants as c
3738
from sqlmesh.core import dialect as d
@@ -76,6 +77,7 @@
7677
from sqlmesh.utils.errors import (
7778
ConfigError,
7879
DestructiveChangeError,
80+
MigrationNotSupportedError,
7981
SQLMeshError,
8082
format_destructive_change_msg,
8183
format_additive_change_msg,
@@ -492,7 +494,7 @@ def migrate(
492494
with self.concurrent_context():
493495
# Only migrate snapshots for which there's an existing data object
494496
concurrent_apply_to_snapshots(
495-
snapshots_by_name.values(),
497+
target_snapshots,
496498
lambda s: self._migrate_snapshot(
497499
s,
498500
snapshots_by_name,
@@ -761,7 +763,7 @@ def _evaluate_snapshot(
761763
snapshots=snapshots,
762764
deployability_index=deployability_index,
763765
render_kwargs=create_render_kwargs,
764-
rendered_physical_properties=rendered_physical_properties,
766+
rendered_physical_properties=rendered_physical_properties.copy(),
765767
allow_destructive_snapshots=allow_destructive_snapshots,
766768
allow_additive_snapshots=allow_additive_snapshots,
767769
)
@@ -774,7 +776,7 @@ def _evaluate_snapshot(
774776
is_table_deployable=is_snapshot_deployable,
775777
deployability_index=deployability_index,
776778
create_render_kwargs=create_render_kwargs,
777-
rendered_physical_properties=rendered_physical_properties,
779+
rendered_physical_properties=rendered_physical_properties.copy(),
778780
dry_run=False,
779781
run_pre_post_statements=False,
780782
)
@@ -865,6 +867,7 @@ def create_snapshot(
865867
rendered_physical_properties=rendered_physical_properties,
866868
allow_destructive_snapshots=allow_destructive_snapshots,
867869
allow_additive_snapshots=allow_additive_snapshots,
870+
run_pre_post_statements=True,
868871
)
869872
else:
870873
is_table_deployable = deployability_index.is_deployable(snapshot)
@@ -1024,6 +1027,7 @@ def _clone_snapshot_in_dev(
10241027
rendered_physical_properties: t.Dict[str, exp.Expression],
10251028
allow_destructive_snapshots: t.Set[str],
10261029
allow_additive_snapshots: t.Set[str],
1030+
run_pre_post_statements: bool = False,
10271031
) -> None:
10281032
adapter = self.get_adapter(snapshot.model.gateway)
10291033

@@ -1035,7 +1039,6 @@ def _clone_snapshot_in_dev(
10351039
adapter.clone_table(
10361040
target_table_name,
10371041
snapshot.table_name(),
1038-
replace=True,
10391042
rendered_physical_properties=rendered_physical_properties,
10401043
)
10411044
self._migrate_target_table(
@@ -1047,6 +1050,7 @@ def _clone_snapshot_in_dev(
10471050
rendered_physical_properties=rendered_physical_properties,
10481051
allow_destructive_snapshots=allow_destructive_snapshots,
10491052
allow_additive_snapshots=allow_additive_snapshots,
1053+
run_pre_post_statements=run_pre_post_statements,
10501054
)
10511055
except Exception:
10521056
adapter.drop_table(target_table_name)
@@ -1111,6 +1115,15 @@ def _migrate_snapshot(
11111115
dry_run=True,
11121116
)
11131117

1118+
# Retry in case when the table is migrated concurrently from another plan application
1119+
@retry(
1120+
reraise=True,
1121+
stop=stop_after_attempt(5),
1122+
wait=wait_exponential(min=1, max=16),
1123+
retry=retry_if_not_exception_type(
1124+
(DestructiveChangeError, AdditiveChangeError, MigrationNotSupportedError)
1125+
),
1126+
)
11141127
def _migrate_target_table(
11151128
self,
11161129
target_table_name: str,
@@ -1440,8 +1453,9 @@ def _can_clone(self, snapshot: Snapshot, deployability_index: DeployabilityIndex
14401453
and adapter.SUPPORTS_CLONING
14411454
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
14421455
and not snapshot.is_managed
1443-
# If the deployable table is missing we can't clone it
14441456
and not deployability_index.is_deployable(snapshot)
1457+
# If the deployable table is missing we can't clone it
1458+
and adapter.table_exists(snapshot.table_name())
14451459
)
14461460

14471461
def _get_data_objects(
@@ -2671,7 +2685,7 @@ def migrate(
26712685
)
26722686
if len(potential_alter_operations) > 0:
26732687
# this can happen if a user changes a managed model and deliberately overrides a plan to be forward only, eg `sqlmesh plan --forward-only`
2674-
raise SQLMeshError(
2688+
raise MigrationNotSupportedError(
26752689
f"The schema of the managed model '{target_table_name}' cannot be updated in a forward-only fashion."
26762690
)
26772691

sqlmesh/core/test/definition.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -644,16 +644,16 @@ def _create_df(
644644
return self._execute(query)
645645

646646
rows = values["rows"]
647+
columns_str: t.Optional[t.List[str]] = None
647648
if columns:
649+
columns_str = [str(c) for c in columns]
648650
referenced_columns = list(dict.fromkeys(col for row in rows for col in row))
649651
_raise_if_unexpected_columns(columns, referenced_columns)
650652

651653
if partial:
652-
columns = referenced_columns
654+
columns_str = [c for c in columns_str if c in referenced_columns]
653655

654-
return pd.DataFrame.from_records(
655-
rows, columns=[str(c) for c in columns] if columns else None
656-
)
656+
return pd.DataFrame.from_records(rows, columns=columns_str)
657657

658658
def _add_missing_columns(
659659
self, query: exp.Query, all_columns: t.Optional[t.Collection[str]] = None

0 commit comments

Comments
 (0)