Skip to content

Commit 864aaa2

Browse files
authored
Merge branch 'main' into jo/warn_on_variabla_render_failure
2 parents 28a5f0e + 97c6a12 commit 864aaa2

File tree

11 files changed

+141
-18
lines changed

11 files changed

+141
-18
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.16.3",
27+
"sqlglot[rs]~=27.17.0",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,17 +169,18 @@ def _df_to_source_queries(
169169
)
170170

171171
def query_factory() -> Query:
172-
if bigframes_pd and isinstance(df, bigframes_pd.DataFrame):
173-
df.to_gbq(
172+
ordered_df = df[list(source_columns_to_types)]
173+
if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame):
174+
ordered_df.to_gbq(
174175
f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}",
175176
if_exists="replace",
176177
)
177178
elif not self.table_exists(temp_table):
178179
# Make mypy happy
179-
assert isinstance(df, pd.DataFrame)
180+
assert isinstance(ordered_df, pd.DataFrame)
180181
self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False)
181182
result = self.__load_pandas_to_table(
182-
temp_bq_table, df, source_columns_to_types, replace=False
183+
temp_bq_table, ordered_df, source_columns_to_types, replace=False
183184
)
184185
if result.errors:
185186
raise SQLMeshError(result.errors)

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ def query_factory() -> Query:
378378
elif isinstance(df, pd.DataFrame):
379379
from snowflake.connector.pandas_tools import write_pandas
380380

381+
ordered_df = df[list(source_columns_to_types)]
382+
381383
# Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034
382384
# The above issue has already been fixed upstream, but we keep the following
383385
# line anyway in order to support a wider range of Snowflake versions.
@@ -388,16 +390,16 @@ def query_factory() -> Query:
388390

389391
# See: https://stackoverflow.com/a/75627721
390392
for column, kind in source_columns_to_types.items():
391-
if is_datetime64_any_dtype(df.dtypes[column]):
393+
if is_datetime64_any_dtype(ordered_df.dtypes[column]):
392394
if kind.is_type("date"): # type: ignore
393-
df[column] = pd.to_datetime(df[column]).dt.date # type: ignore
394-
elif getattr(df.dtypes[column], "tz", None) is not None: # type: ignore
395-
df[column] = pd.to_datetime(df[column]).dt.strftime(
395+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.date # type: ignore
396+
elif getattr(ordered_df.dtypes[column], "tz", None) is not None: # type: ignore
397+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
396398
"%Y-%m-%d %H:%M:%S.%f%z"
397399
) # type: ignore
398400
# https://github.com/snowflakedb/snowflake-connector-python/issues/1677
399401
else: # type: ignore
400-
df[column] = pd.to_datetime(df[column]).dt.strftime(
402+
ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime(
401403
"%Y-%m-%d %H:%M:%S.%f"
402404
) # type: ignore
403405

@@ -407,7 +409,7 @@ def query_factory() -> Query:
407409

408410
write_pandas(
409411
self._connection_pool.get(),
410-
df,
412+
ordered_df,
411413
temp_table.name,
412414
schema=temp_table.db or None,
413415
database=database.sql(dialect=self.dialect) if database else None,

sqlmesh/core/plan/common.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
1616
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
1717
# View models always need to be rebuilt to reflect updated upstream dependencies
1818
return True
19-
if new.is_seed and not new.is_metadata:
19+
if new.is_seed and not (
20+
new.is_metadata
21+
and new.previous_version
22+
and new.previous_version.snapshot_id(new.name) == old.snapshot_id
23+
):
2024
# Seed models always need to be rebuilt to reflect changes in the seed file
2125
# Unless only their metadata has been updated (eg description added) and the seed file has not been touched
2226
return True

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
268268
before_promote_snapshots = {
269269
s.snapshot_id
270270
for s in snapshots.values()
271-
if deployability_index.is_representative(s)
271+
if (deployability_index.is_representative(s) or s.is_seed)
272272
and plan.is_selected_for_backfill(s.name)
273273
}
274274
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots

sqlmesh/core/snapshot/evaluator.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,10 +1141,10 @@ def _migrate_target_table(
11411141
) -> None:
11421142
adapter = self.get_adapter(snapshot.model.gateway)
11431143

1144-
target_table = exp.to_table(target_table_name)
1145-
target_table.this.set("this", f"{target_table.name}_schema_tmp")
1144+
tmp_table = exp.to_table(target_table_name)
1145+
tmp_table.this.set("this", f"{tmp_table.name}_schema_tmp")
1146+
tmp_table_name = tmp_table.sql()
11461147

1147-
tmp_table_name = target_table.sql()
11481148
if snapshot.is_materialized:
11491149
self._execute_create(
11501150
snapshot=snapshot,
@@ -2185,6 +2185,18 @@ def create(
21852185
self.adapter.drop_table(table_name)
21862186
raise
21872187

2188+
def migrate(
2189+
self,
2190+
target_table_name: str,
2191+
source_table_name: str,
2192+
snapshot: Snapshot,
2193+
*,
2194+
ignore_destructive: bool,
2195+
ignore_additive: bool,
2196+
**kwargs: t.Any,
2197+
) -> None:
2198+
raise NotImplementedError("Seeds do not support migrations.")
2199+
21882200
def insert(
21892201
self,
21902202
table_name: str,

sqlmesh/utils/jinja.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,20 @@ def extract_macro_references_and_variables(
206206
return macro_references, variables
207207

208208

209+
def sort_dict_recursive(
210+
item: t.Dict[str, t.Any],
211+
) -> t.Dict[str, t.Any]:
212+
sorted_dict: t.Dict[str, t.Any] = {}
213+
for k, v in sorted(item.items()):
214+
if isinstance(v, list):
215+
sorted_dict[k] = sorted(v)
216+
elif isinstance(v, dict):
217+
sorted_dict[k] = sort_dict_recursive(v)
218+
else:
219+
sorted_dict[k] = v
220+
return sorted_dict
221+
222+
209223
JinjaGlobalAttribute = t.Union[str, int, float, bool, AttributeDict]
210224

211225

@@ -440,7 +454,7 @@ def to_expressions(self) -> t.List[Expression]:
440454
d.PythonCode(
441455
expressions=[
442456
f"{k} = '{v}'" if isinstance(v, str) else f"{k} = {v}"
443-
for k, v in sorted(filtered_objs.items())
457+
for k, v in sort_dict_recursive(filtered_objs).items()
444458
]
445459
)
446460
)

tests/core/engine_adapter/test_bigquery.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,13 @@ def temp_table_exists(table: exp.Table) -> bool:
487487
retry_resp_call.errors = None
488488
retry_mock.return_value = retry_resp
489489
db_call_mock.return_value = AttributeDict({"errors": None})
490-
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
490+
df = pd.DataFrame(
491+
{
492+
"id": [1, 2, 3],
493+
"ts": ["2025-01-01 00:00:00", "2025-01-01 00:00:00", "2025-01-01 00:00:00"],
494+
"val": [7, 8, 9],
495+
}
496+
)
491497
adapter.merge(
492498
target_table="target",
493499
source_table=df,

tests/core/test_integration.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3232,6 +3232,62 @@ def test_virtual_environment_mode_dev_only_model_change_standalone_audit(
32323232
context.apply(plan)
32333233

32343234

3235+
@time_machine.travel("2023-01-08 15:00:00 UTC")
3236+
def test_virtual_environment_mode_dev_only_seed_model_change_schema(
3237+
init_and_plan_context: t.Callable,
3238+
):
3239+
context, plan = init_and_plan_context(
3240+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
3241+
)
3242+
context.apply(plan)
3243+
3244+
new_csv = []
3245+
with open(context.path / "seeds" / "waiter_names.csv", "r") as fd:
3246+
is_header = True
3247+
for idx, line in enumerate(fd):
3248+
line = line.strip()
3249+
if not line:
3250+
continue
3251+
if is_header:
3252+
new_csv.append(line + ",new_column")
3253+
is_header = False
3254+
else:
3255+
new_csv.append(line + f",v{idx}")
3256+
3257+
with open(context.path / "seeds" / "waiter_names.csv", "w") as fd:
3258+
fd.write("\n".join(new_csv))
3259+
3260+
context.load()
3261+
3262+
downstream_model = context.get_model("sushi.waiter_as_customer_by_day")
3263+
downstream_model_kind = downstream_model.kind.dict()
3264+
downstream_model_kwargs = {
3265+
**downstream_model.dict(),
3266+
"kind": {
3267+
**downstream_model_kind,
3268+
"on_destructive_change": "allow",
3269+
},
3270+
"audits": [],
3271+
# Use the new column
3272+
"query": "SELECT '2023-01-07' AS event_date, new_column AS new_column FROM sushi.waiter_names",
3273+
}
3274+
context.upsert_model(SqlModel.parse_obj(downstream_model_kwargs))
3275+
3276+
context.plan("dev", auto_apply=True, no_prompts=True, skip_tests=True, enable_preview=True)
3277+
3278+
assert (
3279+
context.engine_adapter.fetchone(
3280+
"SELECT COUNT(*) FROM sushi__dev.waiter_as_customer_by_day"
3281+
)[0]
3282+
== len(new_csv) - 1
3283+
)
3284+
3285+
# Deploy to prod
3286+
context.clear_caches()
3287+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
3288+
assert "new_column" in context.engine_adapter.columns("sushi.waiter_as_customer_by_day")
3289+
3290+
32353291
@time_machine.travel("2023-01-08 15:00:00 UTC")
32363292
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
32373293
context, plan = init_and_plan_context("examples/sushi")

tests/core/test_plan.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,7 @@ def test_seed_model_metadata_change_no_missing_intervals(
12141214
description="foo",
12151215
)
12161216
)
1217+
snapshot_a_metadata_updated.previous_versions = snapshot_a.all_versions
12171218
assert snapshot_a_metadata_updated.version is None
12181219
assert snapshot_a_metadata_updated.change_category is None
12191220

0 commit comments

Comments
 (0)