Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,18 @@ def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) ->
break

if len(row_tuple_chunk) >= chunk_size:
batch = self._convert_rows_to_arrow_batch_with_row_kind(
yield from self._convert_rows_to_arrow_batches_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema
)
yield batch
row_tuple_chunk = []
row_kind_chunk = []
if stop:
break

if row_tuple_chunk:
batch = self._convert_rows_to_arrow_batch_with_row_kind(
yield from self._convert_rows_to_arrow_batches_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema
)
yield batch
finally:
reader.close()

Expand Down Expand Up @@ -379,24 +377,29 @@ def _read_one_split_to_batches(
row_kind_chunk.append(row.get_row_kind().to_string())

if len(row_tuple_chunk) >= chunk_size:
out.append(self._convert_rows_to_arrow_batch_with_row_kind(
out.extend(self._convert_rows_to_arrow_batches_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema))
row_tuple_chunk = []
row_kind_chunk = []
if row_tuple_chunk:
out.append(self._convert_rows_to_arrow_batch_with_row_kind(
out.extend(self._convert_rows_to_arrow_batches_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema))
finally:
reader.close()
return out

def _convert_rows_to_arrow_batch_with_row_kind(
def _convert_rows_to_arrow_batches_with_row_kind(
self,
row_tuples: List[tuple],
row_kinds: List[str],
schema: pyarrow.Schema
) -> pyarrow.RecordBatch:
"""Convert rows to Arrow batch, optionally including row kind column."""
) -> Iterator[pyarrow.RecordBatch]:
"""Convert rows to one or more Arrow batches, optionally including row kind column.

Yields more than one batch only when a column overflows pyarrow's 2GB
per-column limit (see ``_emit_overflow_safe_batches``); otherwise a single
batch is produced as before.
"""
if not self.include_row_kind or not row_kinds:
# No row kind - use original schema (without _row_kind column)
data_schema = schema
Expand All @@ -410,7 +413,44 @@ def _convert_rows_to_arrow_batch_with_row_kind(
pydict = {ROW_KIND_COLUMN: row_kinds}
for name, column in zip(data_field_names, columns_data):
pydict[name] = list(column)
return pyarrow.RecordBatch.from_pydict(pydict, schema=schema)
yield from self._emit_overflow_safe_batches(pydict, len(row_tuples), schema)

@staticmethod
def _emit_overflow_safe_batches(
pydict: Dict[str, list],
row_count: int,
schema: pyarrow.Schema,
) -> Iterator[pyarrow.RecordBatch]:
"""Yield RecordBatches from a ``{column: list}`` dict, keeping every column
within pyarrow's per-column size limit.

A STRING/BYTES column maps to ``pyarrow.string()``/``binary()`` which use
32-bit offsets (max 2GB per column). A chunk of large values can overflow
that, in which case ``pyarrow.array()`` returns a ``ChunkedArray`` that a
single ``RecordBatch`` cannot hold. When that happens we split the rows in
half and recurse so every emitted batch keeps each column under the limit.
"""
arrays = []
for field in schema:
arr = pyarrow.array(pydict[field.name], type=field.type)
if isinstance(arr, pyarrow.ChunkedArray):
# A column overflowed the 2GB limit and was auto-chunked; split.
break
arrays.append(arr)
else:
yield pyarrow.RecordBatch.from_arrays(arrays, schema=schema)
return

if row_count <= 1:
raise ValueError(
"A single row exceeds the 2GB per-column limit of "
"pyarrow.string()/binary(); cannot build a RecordBatch for this row."
)
mid = row_count // 2
left = {name: column[:mid] for name, column in pydict.items()}
right = {name: column[mid:] for name, column in pydict.items()}
yield from TableRead._emit_overflow_safe_batches(left, mid, schema)
yield from TableRead._emit_overflow_safe_batches(right, row_count - mid, schema)

def _add_row_kind_column_to_batch(
self,
Expand Down
130 changes: 130 additions & 0 deletions paimon-python/pypaimon/tests/table_read_chunked_overflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import unittest
from unittest.mock import patch

import pyarrow as pa

from pypaimon.read import table_read
from pypaimon.read.table_read import ROW_KIND_COLUMN, TableRead

# Patching table_read.pyarrow.array also rebinds pa.array (same module object),
# so keep a handle to the genuine implementation for use inside the fakes.
_REAL_ARRAY = pa.array


def _make_table_read(include_row_kind: bool) -> TableRead:
# The conversion method only depends on self.include_row_kind, so build a
# bare instance instead of standing up a full catalog/table.
tr = TableRead.__new__(TableRead)
tr.include_row_kind = include_row_kind
return tr


class TableReadChunkedOverflowTest(unittest.TestCase):
"""Reading PK tables with very large STRING columns used to crash because a
65536-row chunk could overflow pyarrow.string()'s 2GB per-column limit, making
pyarrow.array() return a ChunkedArray that RecordBatch.from_arrays rejects.

Instead of allocating 2GB, we patch pyarrow.array (as referenced by the module)
to simulate the auto-chunking once a column list grows past a small threshold,
and assert the chunk gets split into several single-Array batches.
"""

OVERFLOW_THRESHOLD = 4 # pretend any column with > 4 rows overflows 2GB

def _patched_array(self, obj, *args, **kwargs):
real = _REAL_ARRAY(obj, *args, **kwargs)
if isinstance(real, pa.Array) and len(real) > self.OVERFLOW_THRESHOLD:
# Emulate pyarrow auto-chunking a >2GB string column.
return pa.chunked_array([real])
return real

def _run(self, include_row_kind):
schema = pa.schema([("id", pa.int64()), ("payload", pa.string())])
if include_row_kind:
schema = pa.schema(
[(ROW_KIND_COLUMN, pa.string())] + [(f.name, f.type) for f in schema]
)

num_rows = 10
row_tuples = [(i, f"v{i}") for i in range(num_rows)]
row_kinds = ["+I"] * num_rows if include_row_kind else []

tr = _make_table_read(include_row_kind)
with patch.object(table_read.pyarrow, "array", side_effect=self._patched_array):
batches = list(
tr._convert_rows_to_arrow_batches_with_row_kind(row_tuples, row_kinds, schema)
)

# The single oversized chunk must be split into multiple batches.
self.assertGreater(len(batches), 1)
for b in batches:
self.assertEqual(b.schema, schema)
for col in b.columns:
self.assertIsInstance(col, pa.Array)
self.assertNotIsInstance(col, pa.ChunkedArray)

merged = pa.Table.from_batches(batches)
self.assertEqual(merged.num_rows, num_rows)
self.assertEqual(merged.column("id").to_pylist(), [i for i in range(num_rows)])
self.assertEqual(
merged.column("payload").to_pylist(), [f"v{i}" for i in range(num_rows)]
)
if include_row_kind:
self.assertEqual(
merged.column(ROW_KIND_COLUMN).to_pylist(), ["+I"] * num_rows
)

def test_split_without_row_kind(self):
self._run(include_row_kind=False)

def test_split_with_row_kind(self):
self._run(include_row_kind=True)

def test_no_split_when_small(self):
# Below the overflow threshold a single batch is produced as before.
schema = pa.schema([("id", pa.int64()), ("payload", pa.string())])
row_tuples = [(i, f"v{i}") for i in range(self.OVERFLOW_THRESHOLD)]

tr = _make_table_read(include_row_kind=False)
with patch.object(table_read.pyarrow, "array", side_effect=self._patched_array):
batches = list(
tr._convert_rows_to_arrow_batches_with_row_kind(row_tuples, [], schema)
)

self.assertEqual(len(batches), 1)
self.assertEqual(batches[0].num_rows, self.OVERFLOW_THRESHOLD)

def test_single_row_overflow_raises(self):
# A single row that still overflows cannot be represented; guard against
# infinite recursion with a clear error.
schema = pa.schema([("payload", pa.string())])

def always_chunk(obj, *args, **kwargs):
return pa.chunked_array([_REAL_ARRAY(obj, *args, **kwargs)])

tr = _make_table_read(include_row_kind=False)
with patch.object(table_read.pyarrow, "array", side_effect=always_chunk):
with self.assertRaises(ValueError):
list(tr._convert_rows_to_arrow_batches_with_row_kind([("x",)], [], schema))


if __name__ == "__main__":
unittest.main()
Loading