Skip to content

Commit df71d2a

Browse files
GayathriSrividyaGayathri Srividya RajavarapuclaudeFokko
authored
Fix duplicate filtering path in Arrow task batches (#3448)
Closes #3272 ## What this changes This PR updates the Arrow scan path in `_task_to_record_batches` to avoid redundant filtering when there are no positional deletes. - Keeps predicate pushdown in `Scanner.from_fragment` as the only filter path when `positional_deletes` is absent. - Applies `current_batch.filter(pyarrow_filter)` only in the positional-delete path, after deletes are applied. - Preserves empty-batch handling after both delete application and conditional filtering. ## Why The previous flow could perform an extra table-level refilter even when the scanner already applied the predicate. This change removes that stale workaround path while keeping correct behavior for positional delete scenarios. ## Tests Added regression coverage in `tests/io/test_pyarrow.py`: - `test_task_to_record_batches_filter_without_positional_deletes_avoids_table_refilter` - `test_task_to_record_batches_filter_with_positional_deletes_handles_empty_batch` Validated locally: - `python -m pytest tests/io/test_pyarrow.py -q -k "task_to_record_batches_nanos or filter_without_positional_deletes_avoids_table_refilter or filter_with_positional_deletes_handles_empty_batch"` - `make lint` --------- Co-authored-by: Gayathri Srividya Rajavarapu <gayathrir@Gayathris-MacBook-Air.local> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Fokko Driesprong <fokko@apache.org>
1 parent fb96df1 commit df71d2a

2 files changed

Lines changed: 134 additions & 11 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,22 +1680,21 @@ def _task_to_record_batches(
16801680
# Create the mask of indices that we're interested in
16811681
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
16821682
current_batch = current_batch.take(indices)
1683+
if pyarrow_filter is not None:
1684+
# Temporary fix until PyArrow 21 is the minimum supported version
1685+
# (https://github.com/apache/arrow/pull/46057): RecordBatch.filter raises
1686+
# IndexError on PyArrow <21 when the result is empty; Table.filter does not.
1687+
table = pa.Table.from_batches([current_batch])
1688+
table = table.filter(pyarrow_filter)
1689+
if table.num_rows == 0:
1690+
current_batch = current_batch.slice(0, 0)
1691+
else:
1692+
current_batch = table.combine_chunks().to_batches()[0]
16831693

16841694
# skip empty batches
16851695
if current_batch.num_rows == 0:
16861696
continue
16871697

1688-
# Apply the user filter
1689-
if pyarrow_filter is not None:
1690-
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
1691-
table = pa.Table.from_batches([current_batch])
1692-
table = table.filter(pyarrow_filter)
1693-
# skip empty batches
1694-
if table.num_rows == 0:
1695-
continue
1696-
1697-
current_batch = table.combine_chunks().to_batches()[0]
1698-
16991698
yield _to_requested_schema(
17001699
projected_schema,
17011700
file_project_schema,

tests/io/test_pyarrow.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3267,6 +3267,130 @@ def _expected_batch(unit: str) -> pa.RecordBatch:
32673267
assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result)
32683268

32693269

3270+
def test_task_to_record_batches_scanner_filter_not_set_with_positional_deletes(tmpdir: str) -> None:
3271+
"""Regression test for https://github.com/apache/iceberg-python/issues/3272.
3272+
3273+
When positional deletes are present the scanner must NOT receive the row filter as a
3274+
push-down predicate. Positional-delete indices reference absolute row positions in the
3275+
original file; if the scanner filters rows first the surviving rows shift and the
3276+
indices no longer map correctly, producing silently wrong results.
3277+
3278+
The test chooses data where the old (buggy) code path gives a distinct wrong answer:
3279+
- File rows (positions 0-3): [1, 2, 3, 4]
3280+
- Positional delete: position 2 → removes value 3 → survivors [1, 2, 4]
3281+
- Row filter: id > 2 → expected result [4]
3282+
3283+
Old bug (scanner pre-filters id > 2 → [3, 4], then _combine_positional_deletes sees only
3284+
2 rows so absolute position 2 is outside the batch range and nothing is deleted → [3, 4]).
3285+
"""
3286+
from pyiceberg.expressions.visitors import bind
3287+
3288+
arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),))
3289+
# File row positions: 0→1, 1→2, 2→3, 3→4
3290+
arrow_table = pa.table([pa.array([1, 2, 3, 4], type=pa.int32())], schema=arrow_schema)
3291+
data_file = _write_table_to_data_file(
3292+
f"{tmpdir}/test_scanner_filter_not_set_with_pos_deletes.parquet", arrow_schema, arrow_table
3293+
)
3294+
3295+
table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))
3296+
3297+
positional_deletes = [pa.chunked_array([pa.array([2], type=pa.int64())])]
3298+
result_batches = list(
3299+
_task_to_record_batches(
3300+
PyArrowFileIO(),
3301+
FileScanTask(data_file),
3302+
bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True),
3303+
projected_schema=table_schema,
3304+
table_schema=table_schema,
3305+
projected_field_ids={1},
3306+
positional_deletes=positional_deletes,
3307+
case_sensitive=True,
3308+
)
3309+
)
3310+
3311+
assert len(result_batches) == 1
3312+
assert result_batches[0].column(0).to_pylist() == [4]
3313+
3314+
3315+
def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: str) -> None:
3316+
"""Regression test: the row filter must be applied *after* positional deletes are removed.
3317+
3318+
When positional deletes are present the scanner does not push down the predicate, so
3319+
``_task_to_record_batches`` must apply ``pyarrow_filter`` explicitly after ``take``.
3320+
This test uses data where the expected result differs from both
3321+
"filter only" and "deletes only" projections, ensuring that skipping either step
3322+
would produce the wrong answer.
3323+
"""
3324+
from pyiceberg.expressions.visitors import bind
3325+
3326+
arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),))
3327+
# File rows (0-indexed positions): 0→1, 1→2, 2→3, 3→4, 4→5
3328+
arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema)
3329+
data_file = _write_table_to_data_file(
3330+
f"{tmpdir}/test_task_to_record_batches_filter_with_positional.parquet", arrow_schema, arrow_table
3331+
)
3332+
3333+
table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))
3334+
3335+
# Delete file-positions 1 and 3 (values 2 and 4); survivors: [1, 3, 5]
3336+
# Then apply filter id >= 3; expected result: [3, 5]
3337+
#
3338+
# Wrong results that would indicate a bug:
3339+
# [1, 3, 5] — filter not applied after deletes
3340+
# [3, 4, 5] — positional deletes not applied (scanner skips filter push-down)
3341+
positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])]
3342+
result_batches = list(
3343+
_task_to_record_batches(
3344+
PyArrowFileIO(),
3345+
FileScanTask(data_file),
3346+
bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True),
3347+
projected_schema=table_schema,
3348+
table_schema=table_schema,
3349+
projected_field_ids={1},
3350+
positional_deletes=positional_deletes,
3351+
case_sensitive=True,
3352+
)
3353+
)
3354+
3355+
assert len(result_batches) == 1
3356+
assert result_batches[0].column(0).to_pylist() == [3, 5]
3357+
3358+
3359+
def test_task_to_record_batches_filter_after_positional_deletes_empty_result(tmpdir: str) -> None:
3360+
"""Regression: filter after positional deletes must not raise even when the result is empty.
3361+
3362+
PyArrow < 21 raises IndexError from RecordBatch.filter(Expression) when the result has
3363+
zero rows (fixed in https://github.com/apache/arrow/pull/46057). This test ensures the
3364+
positional-delete path handles that case gracefully and yields no batches.
3365+
"""
3366+
from pyiceberg.expressions.visitors import bind
3367+
3368+
arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),))
3369+
arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema)
3370+
data_file = _write_table_to_data_file(
3371+
f"{tmpdir}/test_filter_after_positional_deletes_empty_result.parquet", arrow_schema, arrow_table
3372+
)
3373+
3374+
table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))
3375+
3376+
# No rows deleted, but filter (id > 10) eliminates all rows → must return empty
3377+
positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])]
3378+
result_batches = list(
3379+
_task_to_record_batches(
3380+
PyArrowFileIO(),
3381+
FileScanTask(data_file),
3382+
bound_row_filter=bind(table_schema, GreaterThan("id", 10), case_sensitive=True),
3383+
projected_schema=table_schema,
3384+
table_schema=table_schema,
3385+
projected_field_ids={1},
3386+
positional_deletes=positional_deletes,
3387+
case_sensitive=True,
3388+
)
3389+
)
3390+
3391+
assert result_batches == []
3392+
3393+
32703394
def test_parse_location_defaults() -> None:
32713395
"""Test that parse_location uses defaults."""
32723396

0 commit comments

Comments
 (0)