From 41aef66592b2f449d387b0e2afc1192ac2409ffe Mon Sep 17 00:00:00 2001 From: davidopaogun Date: Thu, 2 Jul 2026 15:36:04 +0100 Subject: [PATCH] fix: preserve dictionary encoding in to_arrow_batch_reader --- pyiceberg/table/__init__.py | 14 ++++++++ tests/io/test_pyarrow.py | 67 +++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 63b87d290e..a6db86825a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2196,6 +2196,20 @@ def _to_arrow_batch_reader_via_file_scan_tasks( from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow target_schema = schema_to_pyarrow(projected_schema) + if dictionary_columns: + """schema_to_pyarrow returns plain types. ArrowScan yields + dictionary-encoded batches for the requested columns, so rebuild + target_schema with dictionary types for those fields. Without this, + .cast(target_schema) would silently convert dictionary arrays back + to their plain value type and erase the encoding.""" + dict_col_set = set(dictionary_columns) + target_schema = pa.schema( + [ + field.with_type(pa.dictionary(pa.int32(), field.type)) if field.name in dict_col_set else field + for field in target_schema + ], + metadata=target_schema.metadata, + ) batches = ArrowScan( scan.table_metadata, scan.io, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 532311899d..58e5e124d9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -5229,6 +5229,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"] +def test_to_arrow_batch_reader_preserves_dictionary_columns(tmpdir: str) -> None: + """_to_arrow_batch_reader_via_file_scan_tasks must not strip dictionary encoding. + + Regression test for https://github.com/apache/iceberg-python/issues/3540. + Before the fix, RecordBatchReader.cast(target_schema) was called with a + plain schema, silently converting dictionary arrays back to their value + type so to_arrow_batch_reader(dictionary_columns=...).read_all() returned + plain strings instead of dictionary-encoded arrays. + """ + from pyiceberg.expressions import AlwaysTrue + from pyiceberg.io.pyarrow import PyArrowFileIO + from pyiceberg.partitioning import PartitionSpec + from pyiceberg.table import FileScanTask, _to_arrow_batch_reader_via_file_scan_tasks + from pyiceberg.table.metadata import TableMetadataV2 + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}), + pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}), + ] + ) + arrow_table = pa.table( + [pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())], + schema=arrow_schema, + ) + data_file = _write_table_to_data_file(f"{tmpdir}/test_batch_reader_dict.parquet", arrow_schema, arrow_table) + data_file.spec_id = 0 + + iceberg_schema = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField(2, "label", StringType(), required=False), + ) + table_metadata = TableMetadataV2( + location=f"file://{tmpdir}", + last_column_id=2, + format_version=2, + schemas=[iceberg_schema], + partition_specs=[PartitionSpec()], + ) + + class _MockScan: + def __init__(self) -> None: + self.table_metadata = table_metadata + self.io = PyArrowFileIO() + self.row_filter = AlwaysTrue() + self.case_sensitive = True + self.limit = None + + tasks = [FileScanTask(data_file)] + result = _to_arrow_batch_reader_via_file_scan_tasks( + _MockScan(), # type: ignore[arg-type] + iceberg_schema, + tasks, + dictionary_columns=("label",), + ).read_all() + + # label must be dictionary-encoded, not plain string + assert pa.types.is_dictionary(result.schema.field("label").type), ( + f"expected dictionary type, got {result.schema.field('label').type}" + ) + # id is not in dictionary_columns — must remain int32 + assert result.schema.field("id").type == pa.int32() + # Values must be identical to the source data + assert result.column("label").to_pylist() == ["a", "b", "a", "b"] + assert result.column("id").to_pylist() == [1, 2, 3, 4] + + def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None: """dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.