Skip to content

Commit 77ce71c

Browse files
authored
Fix: do not reduce empty generator when evaluating snapshots (#5548)
1 parent 88c5d38 commit 77ce71c

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,13 +1021,19 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
10211021
):
10221022
import pandas as pd
10231023

1024+
try:
1025+
first_query_or_df = next(queries_or_dfs)
1026+
except StopIteration:
1027+
return
1028+
10241029
query_or_df = reduce(
10251030
lambda a, b: (
10261031
pd.concat([a, b], ignore_index=True) # type: ignore
10271032
if isinstance(a, pd.DataFrame)
10281033
else a.union_all(b) # type: ignore
10291034
), # type: ignore
10301035
queries_or_dfs,
1036+
first_query_or_df,
10311037
)
10321038
apply(query_or_df, index=0)
10331039
else:

tests/core/test_snapshot_evaluator.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,6 +1744,49 @@ def python_func(**kwargs):
17441744
assert adapter_mock.insert_overwrite_by_time_partition.call_args[0][1].to_dict() == output_dict
17451745

17461746

1747+
def test_snapshot_evaluator_yield_empty_pd(adapter_mock, make_snapshot):
1748+
adapter_mock.is_pyspark_df.return_value = False
1749+
adapter_mock.INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INSERT_OVERWRITE
1750+
adapter_mock.try_get_df = lambda x: x
1751+
evaluator = SnapshotEvaluator(adapter_mock)
1752+
1753+
snapshot = make_snapshot(
1754+
PythonModel(
1755+
name="db.model",
1756+
entrypoint="python_func",
1757+
kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds", format="%Y-%m-%d")),
1758+
columns={
1759+
"a": "INT",
1760+
"ds": "STRING",
1761+
},
1762+
python_env={
1763+
"python_func": Executable(
1764+
name="python_func",
1765+
alias="python_func",
1766+
path="test_snapshot_evaluator.py",
1767+
payload="""def python_func(**kwargs):
1768+
yield from ()""",
1769+
)
1770+
},
1771+
)
1772+
)
1773+
1774+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1775+
evaluator.create([snapshot], {})
1776+
1777+
# This should not raise a TypeError from reduce() with empty sequence
1778+
evaluator.evaluate(
1779+
snapshot,
1780+
start="2023-01-01",
1781+
end="2023-01-09",
1782+
execution_time="2023-01-09",
1783+
snapshots={},
1784+
)
1785+
1786+
# When there are no dataframes to process, insert_overwrite_by_time_partition should not be called
1787+
adapter_mock.insert_overwrite_by_time_partition.assert_not_called()
1788+
1789+
17471790
def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot):
17481791
adapter_mock.SUPPORTS_CLONING = True
17491792
adapter_mock.get_alter_operations.return_value = []

0 commit comments

Comments
 (0)