From 2bd46d716814da769bb7a847f9c8eaf1f86f38b8 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 15 Jun 2026 17:48:15 +0800 Subject: [PATCH] [python] Split row chunks that overflow the 2GB per-column limit on read Reading a table with a very large STRING/BYTES column could crash with "Cannot convert ChunkedArray to Array". A 65536-row chunk can exceed the 2GB per-column limit of pyarrow.string()/binary() (32-bit offsets), in which case pyarrow.array() returns a ChunkedArray that a single RecordBatch cannot hold. Convert the row-to-batch helpers into generators that detect the overflow (a column coming back as a ChunkedArray) and recursively split the rows in half, so every emitted RecordBatch keeps each column under the limit. A single row that still overflows raises a clear error instead of recursing forever. Both the serial and parallel read paths are updated accordingly. --- paimon-python/pypaimon/read/table_read.py | 60 ++++++-- .../tests/table_read_chunked_overflow_test.py | 130 ++++++++++++++++++ 2 files changed, 180 insertions(+), 10 deletions(-) create mode 100644 paimon-python/pypaimon/tests/table_read_chunked_overflow_test.py diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 1c29025fdb9e..37312805778c 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -233,20 +233,18 @@ def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) -> break if len(row_tuple_chunk) >= chunk_size: - batch = self._convert_rows_to_arrow_batch_with_row_kind( + yield from self._convert_rows_to_arrow_batches_with_row_kind( row_tuple_chunk, row_kind_chunk, schema ) - yield batch row_tuple_chunk = [] row_kind_chunk = [] if stop: break if row_tuple_chunk: - batch = self._convert_rows_to_arrow_batch_with_row_kind( + yield from self._convert_rows_to_arrow_batches_with_row_kind( row_tuple_chunk, row_kind_chunk, schema ) - yield batch finally: reader.close() @@ -379,24 +377,29 @@ def _read_one_split_to_batches( row_kind_chunk.append(row.get_row_kind().to_string()) if len(row_tuple_chunk) >= chunk_size: - out.append(self._convert_rows_to_arrow_batch_with_row_kind( + out.extend(self._convert_rows_to_arrow_batches_with_row_kind( row_tuple_chunk, row_kind_chunk, schema)) row_tuple_chunk = [] row_kind_chunk = [] if row_tuple_chunk: - out.append(self._convert_rows_to_arrow_batch_with_row_kind( + out.extend(self._convert_rows_to_arrow_batches_with_row_kind( row_tuple_chunk, row_kind_chunk, schema)) finally: reader.close() return out - def _convert_rows_to_arrow_batch_with_row_kind( + def _convert_rows_to_arrow_batches_with_row_kind( self, row_tuples: List[tuple], row_kinds: List[str], schema: pyarrow.Schema - ) -> pyarrow.RecordBatch: - """Convert rows to Arrow batch, optionally including row kind column.""" + ) -> Iterator[pyarrow.RecordBatch]: + """Convert rows to one or more Arrow batches, optionally including row kind column. + + Yields more than one batch only when a column overflows pyarrow's 2GB + per-column limit (see ``_emit_overflow_safe_batches``); otherwise a single + batch is produced as before. + """ if not self.include_row_kind or not row_kinds: # No row kind - use original schema (without _row_kind column) data_schema = schema @@ -410,7 +413,44 @@ def _convert_rows_to_arrow_batch_with_row_kind( pydict = {ROW_KIND_COLUMN: row_kinds} for name, column in zip(data_field_names, columns_data): pydict[name] = list(column) - return pyarrow.RecordBatch.from_pydict(pydict, schema=schema) + yield from self._emit_overflow_safe_batches(pydict, len(row_tuples), schema) + + @staticmethod + def _emit_overflow_safe_batches( + pydict: Dict[str, list], + row_count: int, + schema: pyarrow.Schema, + ) -> Iterator[pyarrow.RecordBatch]: + """Yield RecordBatches from a ``{column: list}`` dict, keeping every column + within pyarrow's per-column size limit. + + A STRING/BYTES column maps to ``pyarrow.string()``/``binary()`` which use + 32-bit offsets (max 2GB per column). A chunk of large values can overflow + that, in which case ``pyarrow.array()`` returns a ``ChunkedArray`` that a + single ``RecordBatch`` cannot hold. When that happens we split the rows in + half and recurse so every emitted batch keeps each column under the limit. + """ + arrays = [] + for field in schema: + arr = pyarrow.array(pydict[field.name], type=field.type) + if isinstance(arr, pyarrow.ChunkedArray): + # A column overflowed the 2GB limit and was auto-chunked; split. + break + arrays.append(arr) + else: + yield pyarrow.RecordBatch.from_arrays(arrays, schema=schema) + return + + if row_count <= 1: + raise ValueError( + "A single row exceeds the 2GB per-column limit of " + "pyarrow.string()/binary(); cannot build a RecordBatch for this row." + ) + mid = row_count // 2 + left = {name: column[:mid] for name, column in pydict.items()} + right = {name: column[mid:] for name, column in pydict.items()} + yield from TableRead._emit_overflow_safe_batches(left, mid, schema) + yield from TableRead._emit_overflow_safe_batches(right, row_count - mid, schema) def _add_row_kind_column_to_batch( self, diff --git a/paimon-python/pypaimon/tests/table_read_chunked_overflow_test.py b/paimon-python/pypaimon/tests/table_read_chunked_overflow_test.py new file mode 100644 index 000000000000..fe2516cb0da8 --- /dev/null +++ b/paimon-python/pypaimon/tests/table_read_chunked_overflow_test.py @@ -0,0 +1,130 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from unittest.mock import patch + +import pyarrow as pa + +from pypaimon.read import table_read +from pypaimon.read.table_read import ROW_KIND_COLUMN, TableRead + +# Patching table_read.pyarrow.array also rebinds pa.array (same module object), +# so keep a handle to the genuine implementation for use inside the fakes. +_REAL_ARRAY = pa.array + + +def _make_table_read(include_row_kind: bool) -> TableRead: + # The conversion method only depends on self.include_row_kind, so build a + # bare instance instead of standing up a full catalog/table. + tr = TableRead.__new__(TableRead) + tr.include_row_kind = include_row_kind + return tr + + +class TableReadChunkedOverflowTest(unittest.TestCase): + """Reading PK tables with very large STRING columns used to crash because a + 65536-row chunk could overflow pyarrow.string()'s 2GB per-column limit, making + pyarrow.array() return a ChunkedArray that RecordBatch.from_arrays rejects. + + Instead of allocating 2GB, we patch pyarrow.array (as referenced by the module) + to simulate the auto-chunking once a column list grows past a small threshold, + and assert the chunk gets split into several single-Array batches. + """ + + OVERFLOW_THRESHOLD = 4 # pretend any column with > 4 rows overflows 2GB + + def _patched_array(self, obj, *args, **kwargs): + real = _REAL_ARRAY(obj, *args, **kwargs) + if isinstance(real, pa.Array) and len(real) > self.OVERFLOW_THRESHOLD: + # Emulate pyarrow auto-chunking a >2GB string column. + return pa.chunked_array([real]) + return real + + def _run(self, include_row_kind): + schema = pa.schema([("id", pa.int64()), ("payload", pa.string())]) + if include_row_kind: + schema = pa.schema( + [(ROW_KIND_COLUMN, pa.string())] + [(f.name, f.type) for f in schema] + ) + + num_rows = 10 + row_tuples = [(i, f"v{i}") for i in range(num_rows)] + row_kinds = ["+I"] * num_rows if include_row_kind else [] + + tr = _make_table_read(include_row_kind) + with patch.object(table_read.pyarrow, "array", side_effect=self._patched_array): + batches = list( + tr._convert_rows_to_arrow_batches_with_row_kind(row_tuples, row_kinds, schema) + ) + + # The single oversized chunk must be split into multiple batches. + self.assertGreater(len(batches), 1) + for b in batches: + self.assertEqual(b.schema, schema) + for col in b.columns: + self.assertIsInstance(col, pa.Array) + self.assertNotIsInstance(col, pa.ChunkedArray) + + merged = pa.Table.from_batches(batches) + self.assertEqual(merged.num_rows, num_rows) + self.assertEqual(merged.column("id").to_pylist(), [i for i in range(num_rows)]) + self.assertEqual( + merged.column("payload").to_pylist(), [f"v{i}" for i in range(num_rows)] + ) + if include_row_kind: + self.assertEqual( + merged.column(ROW_KIND_COLUMN).to_pylist(), ["+I"] * num_rows + ) + + def test_split_without_row_kind(self): + self._run(include_row_kind=False) + + def test_split_with_row_kind(self): + self._run(include_row_kind=True) + + def test_no_split_when_small(self): + # Below the overflow threshold a single batch is produced as before. + schema = pa.schema([("id", pa.int64()), ("payload", pa.string())]) + row_tuples = [(i, f"v{i}") for i in range(self.OVERFLOW_THRESHOLD)] + + tr = _make_table_read(include_row_kind=False) + with patch.object(table_read.pyarrow, "array", side_effect=self._patched_array): + batches = list( + tr._convert_rows_to_arrow_batches_with_row_kind(row_tuples, [], schema) + ) + + self.assertEqual(len(batches), 1) + self.assertEqual(batches[0].num_rows, self.OVERFLOW_THRESHOLD) + + def test_single_row_overflow_raises(self): + # A single row that still overflows cannot be represented; guard against + # infinite recursion with a clear error. + schema = pa.schema([("payload", pa.string())]) + + def always_chunk(obj, *args, **kwargs): + return pa.chunked_array([_REAL_ARRAY(obj, *args, **kwargs)]) + + tr = _make_table_read(include_row_kind=False) + with patch.object(table_read.pyarrow, "array", side_effect=always_chunk): + with self.assertRaises(ValueError): + list(tr._convert_rows_to_arrow_batches_with_row_kind([("x",)], [], schema)) + + +if __name__ == "__main__": + unittest.main()