From fb11ca1378860c6e4f85e7ed2a95fdb61d24e62c Mon Sep 17 00:00:00 2001 From: George Sittas Date: Fri, 17 Oct 2025 16:20:23 +0300 Subject: [PATCH] Fix: do not reduce empty generator when evaluating snapshots --- sqlmesh/core/snapshot/evaluator.py | 6 ++++ tests/core/test_snapshot_evaluator.py | 43 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 762e4b91ec..1808011854 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1021,6 +1021,11 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None: ): import pandas as pd + try: + first_query_or_df = next(queries_or_dfs) + except StopIteration: + return + query_or_df = reduce( lambda a, b: ( pd.concat([a, b], ignore_index=True) # type: ignore @@ -1028,6 +1033,7 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None: else a.union_all(b) # type: ignore ), # type: ignore queries_or_dfs, + first_query_or_df, ) apply(query_or_df, index=0) else: diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 3a3a1a9376..9dd645ac15 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1744,6 +1744,49 @@ def python_func(**kwargs): assert adapter_mock.insert_overwrite_by_time_partition.call_args[0][1].to_dict() == output_dict +def test_snapshot_evaluator_yield_empty_pd(adapter_mock, make_snapshot): + adapter_mock.is_pyspark_df.return_value = False + adapter_mock.INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INSERT_OVERWRITE + adapter_mock.try_get_df = lambda x: x + evaluator = SnapshotEvaluator(adapter_mock) + + snapshot = make_snapshot( + PythonModel( + name="db.model", + entrypoint="python_func", + kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds", format="%Y-%m-%d")), + columns={ + "a": "INT", + "ds": "STRING", + }, + python_env={ + "python_func": Executable( + name="python_func", + alias="python_func", + path="test_snapshot_evaluator.py", + payload="""def python_func(**kwargs): + yield from ()""", + ) + }, + ) + ) + + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + evaluator.create([snapshot], {}) + + # This should not raise a TypeError from reduce() with empty sequence + evaluator.evaluate( + snapshot, + start="2023-01-01", + end="2023-01-09", + execution_time="2023-01-09", + snapshots={}, + ) + + # When there are no dataframes to process, insert_overwrite_by_time_partition should not be called + adapter_mock.insert_overwrite_by_time_partition.assert_not_called() + + def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot): adapter_mock.SUPPORTS_CLONING = True adapter_mock.get_alter_operations.return_value = []